Skip to content

Instantly share code, notes, and snippets.

@chathurawidanage
Created December 13, 2019 01:44
Show Gist options
  • Save chathurawidanage/0adef2dfc326de53d56692336eb98956 to your computer and use it in GitHub Desktop.
Save chathurawidanage/0adef2dfc326de53d56692336eb98956 to your computer and use it in GitHub Desktop.
UCX Test Sender
package com.cwidanage;
import org.openucx.jucx.UcxCallback;
import org.openucx.jucx.ucp.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class UCPTest2 {
public static void main(String[] args) {
UcpContext context = new UcpContext(new UcpParams().requestTagFeature().requestWakeupFeature());
UcpWorker ucpWorker = context.newWorker(new UcpWorkerParams()
.requestThreadSafety().requestWakeupRX());
UcpEndpoint localhost = ucpWorker.newEndpoint(new UcpEndpointParams().setSocketAddress(
new InetSocketAddress("0.0.0.0", 5001)
));
Thread t = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
ucpWorker.progress();
}
}
});
t.setDaemon(true);
t.start();
Queue<UcpRequest> requests = new LinkedList<>();
BlockingQueue<ByteBuffer> buff = new ArrayBlockingQueue<>(10);
for (int i = 0; i < 2; i++) {
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
buff.add(buffer);
}
for (int i = 0; i < 100; i++) {
ByteBuffer buffer = buff.poll();
if (buffer == null) {
i--;
continue;
}
buffer.putInt(i);
System.out.println("Posting : " + buffer);
buffer.limit(buffer.position());
buffer.position(0);
System.out.println("Posting After : " + buffer);
UcpRequest sendMessage = localhost.sendTaggedNonBlocking(buffer, 0, new UcxCallback() {
@Override
public void onSuccess(UcpRequest request) {
System.out.println("Success ");
buff.add(buffer);
}
@Override
public void onError(int ucsStatus, String errorMsg) {
System.out.println(errorMsg);
}
});
requests.add(sendMessage);
}
while (!requests.isEmpty()) {
if (requests.peek().isCompleted()) {
requests.poll();
}
ucpWorker.progress();
localhost.flushNonBlocking(null);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment