Last active
August 12, 2022 01:18
-
-
Save shanemhansen/034b958bb3c5cfa4f53874a0f98e2f02 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ai.shane; | |
import com.google.api.core.ApiFunction; | |
import com.google.api.core.ApiFuture; | |
import com.google.api.core.ApiFutureCallback; | |
import com.google.api.core.ApiFutures; | |
import com.google.api.gax.core.ExecutorProvider; | |
import com.google.api.gax.core.FixedExecutorProvider; | |
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; | |
import com.google.cloud.pubsub.v1.Publisher; | |
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; | |
import com.google.protobuf.ByteString; | |
import com.google.pubsub.v1.PubsubMessage; | |
import com.google.pubsub.v1.TopicName; | |
import io.grpc.ManagedChannelBuilder; | |
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; | |
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup; | |
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollSocketChannel; | |
import java.io.IOException; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.ArrayList; | |
import java.util.List; | |
/** | |
* This app can be run after building a suitable jar by running /usr/bin/time jar -cp $WHATEVER.jar ai.shane.App 10 500 | |
* | |
*/ | |
public class App { | |
private static AtomicLong msgCount = new AtomicLong(); | |
public static void main(String[] args) { | |
// Apologies for bad docks. | |
// args[0] is threadpool size. 5 is enough for 500 topics in my testing | |
// args[1] is number of publishers. I precreated 500 or so topics. You will need to have topic1 though topic+args[1] topics in pubsub. | |
String projectId = "gcp-project"; | |
List<Future<String>> msgIds = new ArrayList<Future<String>>(); | |
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(Integer.parseInt(args[0])); | |
FixedExecutorProvider executorProvider = FixedExecutorProvider.create(executor); | |
// NioEventLoopGroup loop = new NioEventLoopGroup(1, executor); | |
EpollEventLoopGroup loop = new EpollEventLoopGroup(4, executor); | |
InstantiatingGrpcChannelProvider channelProvider = PublisherStubSettings.defaultGrpcTransportProviderBuilder() | |
.setExecutorProvider(executorProvider) | |
.setChannelConfigurator( | |
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() { | |
@Override | |
public ManagedChannelBuilder apply( | |
ManagedChannelBuilder managedChannelBuilder) { | |
NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) managedChannelBuilder | |
.executor(loop); | |
nettyChannelBuilder.eventLoopGroup(loop); | |
nettyChannelBuilder.channelType( | |
EpollSocketChannel.class); // Use EPoll if available, if using EPoll update | |
// above line to use EPollEventLoopGroup | |
return nettyChannelBuilder; | |
} | |
}) | |
.build(); | |
try { | |
final int count = Integer.parseInt(args[1]) + 1; | |
List<Future<Publisher>> plist = new ArrayList<Future<Publisher>>(); | |
// Note this creates $count publishers talking to $count topics using the naming convention | |
// topic1..n. You will need to create these if you are testing. | |
for (int i = 1; i < count; i++) { | |
plist.add(createPublisher(projectId, "topic" + i, executorProvider, channelProvider)); | |
} | |
System.out.println("creating publishers in parallel using low fixed number of threads."); | |
List<Publisher> pubs = new ArrayList<>(); | |
for (Future<Publisher> p : plist) { | |
pubs.add(p.get()); | |
} | |
System.out.println("created publishers. Sending messages async."); | |
for(Publisher p: pubs) { | |
msgIds.add(publishAsync(p, "message", executorProvider)); | |
} | |
System.out.println("Resolving message futures. Getting message IDs."); | |
// wait for everything | |
for (Future<String> msgId : msgIds) { | |
msgId.get(); | |
} | |
System.out.println("shutdown event loops and executors"); | |
loop.shutdownGracefully().get(); | |
// have to shutdown the epoll executor | |
executor.shutdown(); | |
executor.awaitTermination(5, TimeUnit.SECONDS); | |
System.out.println("sent " + msgCount.get() + " messages."); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
public static Future<String> publishAsync(Publisher p, String message, ExecutorProvider executorProvider) | |
throws IOException, ExecutionException, InterruptedException { | |
ByteString data = ByteString.copyFromUtf8(message); | |
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); | |
// Once published, returns a server-assigned message id (unique within the | |
// topic) | |
ApiFuture<String> msgId = p.publish(pubsubMessage); | |
ApiFutures.addCallback(msgId, new ApiFutureCallback<String>() { | |
@Override | |
public void onFailure(Throwable t) { | |
// TODO actual error handling | |
t.printStackTrace(); | |
} | |
@Override | |
public void onSuccess(String result) { | |
msgCount.incrementAndGet(); | |
} | |
}, executorProvider.getExecutor()); | |
return msgId; | |
} | |
public static Future<Publisher> createPublisher(String projectId, String topicId, ExecutorProvider executorProvider, | |
InstantiatingGrpcChannelProvider channelProvider) | |
throws IOException { | |
TopicName topicName = TopicName.of(projectId, topicId); | |
return executorProvider.getExecutor().submit(() -> { | |
return Publisher.newBuilder(topicName).setExecutorProvider(executorProvider) | |
.setChannelProvider(channelProvider) | |
.build(); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ai.shane; | |
import java.io.IOException; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ScheduledThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import com.google.api.core.ApiFunction; | |
import com.google.api.gax.core.FixedExecutorProvider; | |
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; | |
import com.google.cloud.pubsub.v1.Publisher; | |
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; | |
import com.google.protobuf.ByteString; | |
import com.google.pubsub.v1.PubsubMessage; | |
import com.google.pubsub.v1.TopicName; | |
import io.grpc.ManagedChannelBuilder; | |
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; | |
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup; | |
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollSocketChannel; | |
public class Slim { | |
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { | |
String topicId = "topic1"; | |
String projectId = "gcp-project"; | |
// A fixed sized threadpool is not necessarily recommended, but here for demonstration purposes. | |
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5); // 5 is plenty for 500 producers | |
FixedExecutorProvider executorProvider = FixedExecutorProvider.create(executor); | |
EpollEventLoopGroup loop = new EpollEventLoopGroup(4, executor); | |
InstantiatingGrpcChannelProvider channelProvider = PublisherStubSettings.defaultGrpcTransportProviderBuilder() | |
.setExecutorProvider(executorProvider) | |
.setChannelConfigurator( | |
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() { | |
@Override | |
public ManagedChannelBuilder apply( | |
ManagedChannelBuilder managedChannelBuilder) { | |
NettyChannelBuilder nettyChannelBuilder = (NettyChannelBuilder) managedChannelBuilder | |
.executor(loop); | |
nettyChannelBuilder.eventLoopGroup(loop); | |
nettyChannelBuilder.channelType( | |
EpollSocketChannel.class); // Use EPoll if available, if using EPoll update | |
// above line to use EPollEventLoopGroup | |
return nettyChannelBuilder; | |
} | |
}) | |
.build(); | |
TopicName topicName = TopicName.of(projectId, topicId); | |
// the calls to setExecutorProvider and setChannelProvider are key | |
// to managing threadcount and sharing resources. | |
Publisher p = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider) | |
.setChannelProvider(channelProvider) | |
.build(); | |
ByteString data = ByteString.copyFromUtf8("hello, world!"); | |
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); | |
p.publish(pubsubMessage).get(); | |
loop.shutdownGracefully().get(); | |
// have to shutdown the epoll executor | |
executor.shutdown(); | |
executor.awaitTermination(5, TimeUnit.SECONDS); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment