Created
December 20, 2019 11:26
-
-
Save twillouer/7666806d22384147ba926d0313a676f5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static final GenericObjectPoolConfig<Channel> GENERIC_OBJECT_POOL_CONFIG = new GenericObjectPoolConfig<>(); | |
static { | |
// RabbitMQ has channel_max to 2047. https://github.com/rabbitmq/rabbitmq-java-client/issues/366 | |
GENERIC_OBJECT_POOL_CONFIG.setMaxTotal(2_000); | |
GENERIC_OBJECT_POOL_CONFIG.setMinIdle(2); | |
GENERIC_OBJECT_POOL_CONFIG.setMaxIdle(10); // Reset long channel | |
GENERIC_OBJECT_POOL_CONFIG.setMaxWaitMillis(TimeUnit.MINUTES.toMillis(10)); // Avoid too long hung | |
GENERIC_OBJECT_POOL_CONFIG.setJmxEnabled(false); | |
} | |
private static class GenericRabbitMQChannelPool extends GenericObjectPool<Channel> { | |
private GenericRabbitMQChannelPool(final Connection connection) | |
{ | |
super(new RabbitMQChannelPool(connection), GENERIC_OBJECT_POOL_CONFIG); | |
} | |
private void withRetryChannel(final RunnableWithChannel runnable) throws IOException | |
{ | |
withRetryChannel((CallableWithChannel<Void>) channel -> { | |
try { | |
runnable.run(channel); | |
return null; | |
} catch (final AlreadyClosedException e) { | |
throw e; | |
} catch (final RuntimeException e) { // ShutdownSignalException.java | |
throw new IOException(e); | |
} | |
}); | |
} | |
private <R> R withRetryChannel(final CallableWithChannel<R> callable) throws IOException | |
{ | |
try { | |
return RETRY.call(() -> { | |
Channel channel = null; | |
try { | |
channel = borrowChannel(); | |
return callable.call(channel); | |
} catch (final AlreadyClosedException e) { | |
if (e.getMessage().contains("cause: java.io.EOFException")) { // This is ugly :( | |
throw new ExecutionException(e); | |
} | |
throw e; | |
} finally { | |
returnChannel(channel); | |
} | |
}, ExecutionException.class); // ExecutionException only will be retry. | |
} catch (final ExecutionException e) { | |
if (e.getCause() instanceof IOException) { | |
throw rethrow((IOException) e.getCause()); | |
} | |
throw new IOException(e.getCause()); // Probably impossible | |
} catch (final IOException e) { | |
throw rethrow(e); | |
} catch (final Exception e) { | |
throw new IOException(e); | |
} | |
} | |
private Channel borrowChannel() | |
{ | |
try { | |
return super.borrowObject(); | |
} catch (final Exception e) { | |
throw new UncheckedExecutionException("Unable to borrow buffer from pool", e); | |
} | |
} | |
private void returnChannel(@Nullable final Channel channel) | |
{ | |
try { | |
if (channel != null) { | |
if (channel.isOpen()) { | |
this.returnObject(channel); | |
} else { | |
// When an IOException occurred, channel is automatically closed. | |
this.invalidateObject(channel); | |
} | |
} | |
} catch (final Exception e) { | |
// Silently ignore | |
} | |
} | |
// Qualify exception if possible | |
private static IOException rethrow(final IOException ioe) | |
{ | |
if (isQueueNotFound(ioe)) { | |
return new QueueDoesnotExistException(ioe); | |
} | |
return ioe; | |
} | |
} | |
private final GenericRabbitMQChannelPool channelObjectPoolRead = new GenericRabbitMQChannelPool(connectionRead); | |
private final GenericRabbitMQChannelPool channelObjectPoolWrite = new GenericRabbitMQChannelPool(connectionWrite); | |
@Override | |
public void topicPublish(@Nonnull final String topic, final String subject, final byte[] message, | |
@Nullable final Duration ttl) throws IOException | |
{ | |
channelObjectPoolWrite.withRetryChannel(channel -> { | |
// publish | |
final AMQP.BasicProperties.Builder propertiesBuilder = new AMQP.BasicProperties.Builder(); | |
if (ttl != null) { | |
propertiesBuilder.expiration(Long.toString(ttl.toMillis())); | |
} | |
channel.basicPublish(topic, subject, propertiesBuilder.build(), message); | |
try { | |
channel.waitForConfirmsOrDie(); | |
} catch (final InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
throw new IOException(e); | |
} | |
}); | |
} | |
@FunctionalInterface | |
private interface RunnableWithChannel { | |
void run(Channel channel) throws IOException; | |
} | |
private static Connection createConnection(final String poolName) | |
{ | |
while (!Thread.currentThread().isInterrupted()) { | |
try { | |
return RabbitMQFactory.createConnection(poolName); | |
} catch (final IOException | TimeoutException e) { | |
log.warn("cannot get connection to rabbit", e); | |
} | |
ThreadUtils.sleep(100L); | |
} | |
throw new IllegalStateException("No rabbit connection available before interruption"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment