Skip to content

Instantly share code, notes, and snippets.

@gorshkov-leonid
Last active August 7, 2021 20:15
Show Gist options
  • Save gorshkov-leonid/bdec70230f5b209257ee960dc1142e2d to your computer and use it in GitHub Desktop.
Save gorshkov-leonid/bdec70230f5b209257ee960dc1142e2d to your computer and use it in GitHub Desktop.
vertx websocket
package aaa;

import com.google.common.util.concurrent.Uninterruptibles;
import io.vertx.core.Vertx;
import io.vertx.core.http.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

public class Main
{
    public static final String TOKEN = "eyJhbGciOiJSUzI1NiIsImtpbmQiOiJtYWNoaW5lX3Rva2VuIiwia2lkIjoid29ya3NwYWNlb3A1NTB0Yjk1Z2d3Nng2NyJ9.eyJ3c2lkIjoid29ya3NwYWNlb3A1NTB0Yjk1Z2d3Nng2NyIsInVpZCI6IjQ2NjAwY2I3LWRiY2UtNGZjNy05NjUzLTNlZWZmYjU4Yzk0OCIsImF1ZCI6IndvcmtzcGFjZW9wNTUwdGI5NWdndzZ4NjciLCJuYmYiOi0xLCJ1bmFtZSI6ImFkbWluIiwiaXNzIjoid3NtYXN0ZXIiLCJleHAiOjE2NTY1MDQ2ODgsImlhdCI6MTYyNDk2ODY4OCwianRpIjoiYWQwOWY3YzUtYTYxZS00Njk3LWI5ZjgtMWU5NDZjM2Y5ZjQ5In0.HvgF8uJ_BC0ZDJb6fGi5P9L4Kpj9usJgTH5qajGFuFjta7-0rBP6vkJOIuDa0jG6MSsv2IDz88_lEGKTWJ_KT7JfCQ_hSqv5KA7lSGoVTJ9ucN159bVl3EeiJJxKeXFv9edMtW06IthR6DePWqBFLBmYvsHWnd-vBMZ4joXT0lXC2rQpeDq4oc1wjilIj3MOuTVrB0xPl-VTazEveVXYv2Vwk00P6SijWALEI8Mb3VE7YvMjudBi9QRXpXVIGcNkGWSgO-jVO4xkIBB3jU12Gfsgsveo8ZMn1Pm59gtzaO2HhM-XnCMemj-TOzdsFdmM67hSqqivi9bJBv3jcx3qwA";

    public static void main(String[] args) throws IOException
    {

        HttpClientOptions options = new HttpClientOptions();

        options.setMaxWebSocketFrameSize(5_242_880);
//        options.setSsl(true);
//        options.setSslEngineOptions(HttpClientOptions.DEFAULT_SSL_ENGINE);

        HttpClient client = Vertx.factory.vertx().createHttpClient(options);

        final Semaphore semaphore = new Semaphore(1);

        aquire(semaphore);

        AtomicReference<WebSocket> wsr = new AtomicReference<>();

        client
                .webSocket(
                        443,
                        "aaaa.cloud", "/services?token=" + TOKEN,
                        resp -> {
                            if (resp.cause() != null)
                            {
                                resp.cause().printStackTrace();
                                System.exit(-1);
                            }

                            WebSocket websocket = resp.result();
                            wsr.set(websocket);
                            websocket
                                    .handler(data -> System.out.println("Message: " + data.toString(StandardCharsets.UTF_8)))
                                    .closeHandler(aVoid -> System.out.println("Websocket closed normally."))
                                    .endHandler(aVoid -> System.out.println("Websocket ended normally."))
                                    .exceptionHandler(e -> {
                                        e.printStackTrace();
                                        System.exit(-1);
                                    });

                            semaphore.release();
                        });
        Executors.newFixedThreadPool(2).execute(() -> {
            aquire(semaphore);
//            sendMsgWait(wsr.get(), "{\"kind\":\"open\",\"id\":2,\"path\":\"/services/filesystem\"}");
            sendMsgWait(wsr.get(), "{\"id\":0,\"kind\":\"open\",\"content\":null,\"path\":\"/services/filesystem\"}");
            sendMsgWait(wsr.get(), "{\"id\":1,\"kind\":\"open\",\"content\":null,\"path\":\"/services/task\"}");
            sendMsgWait(wsr.get(), "{\"id\":2,\"kind\":\"open\",\"content\":null,\"path\":\"/services/fs-watcher\"}");
        });
        System.in.read();
    }

    private static void sendMsgWait(WebSocket webSocket, String msg)
    {
        try
        {
            webSocket.writeTextMessage(msg);
            Thread.sleep(3000);
        }
        catch (InterruptedException e)
        {
            System.out.println("Can't sleep");
            System.exit(-1);
        }
    }

    private static void aquire(Semaphore semaphore)
    {
        if (!Uninterruptibles.tryAcquireUninterruptibly(semaphore, 15, TimeUnit.SECONDS))
        {
            System.out.println("Can't acquire semaphore");
            System.exit(-1);
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment