Information for trying to interop between AMQP reactive messaging, NodeJS AMQP rhea and Spring Artemis JmsTemplate.
Logs are retrieved when only the consumer and one producer is running.
Messages are retrieved from JMSToolbox on separate run to avoid possible mismatch.
It is a reactive messaging based message consumer (Quarkus application)
@Incoming("orders-doc")
public void indexOrder(JsonObject jsonMsg) throws IOException {
System.out.println(jsonMsg);
UMessage<Order> message = jsonMsg.mapTo(UMessage.class);
System.out.println("UUID: " + message.header.uuid);
Order order = message.bodyAs(Order.class);
User user = searchService.getUser(order.userId);
user.orders.add(order);
searchService.indexUser(user);
}
@Inject @Channel("users-doc") Emitter<User> userDocEmitter;
@POST
public Response create(User user) {
// persist the user to the database
user.persist();
// send a doc message with the user
OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
.withId(UUID.randomUUID().toString())
.build();
userDocEmitter.send(Message.of(user).addMetadata(metadata));
URI uri = URI.create("/users/" + user.id.toString());
return Response.created(uri).build();
}
artemis_1 | [1565387426:0] <- Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x00\x00\x02, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (265) "\x00Sp\xc0\x04\x02AP\x00\x00Ss\xd0\x00\x00\x00[\x00\x00\x00\x07\xa1$a0cfe1e0-d7d6-408b-96dc-212be4a94b4f@\xa1\x19POCUSRU001.DOC.USER.PP.V1@@@\xa3\x10application/json\x00Su\xa0\x98{"id":"5f575d2361571c1016178ab4","name":"Lo\xc3\xafc Mathieu 2","birthDate":[2000,1,2],"email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","orders":[]}"
artemis_1 | [1565387426:0] -> Disposition{role=RECEIVER, first=1, last=1, settled=true, state=Accepted{}, batchable=false}
artemis_1 | [1888701975:0] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (265) "\x00Sp\xc0\x04\x02AP\x00\x00Ss\xd0\x00\x00\x00[\x00\x00\x00\x07\xa1$a0cfe1e0-d7d6-408b-96dc-212be4a94b4f@\xa1\x19POCUSRU001.DOC.USER.PP.V1@@@\xa3\x10application/json\x00Su\xa0\x98{"id":"5f575d2361571c1016178ab4","name":"Lo\xc3\xafc Mathieu 2","birthDate":[2000,1,2],"email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","orders":[]}"
ActiveMQMessage[ID:0a4ad635-f1c1-11ea-a838-0242ac140004]:PERSISTENT/ClientMessageImpl[messageID=208, durable=true, address=POCUSRU001.DOC.USER.PP.V1,userID=0a4ad635-f1c1-11ea-a838-0242ac140004,properties=TypedProperties[NATIVE_MESSAGE_ID=ID:AMQP_NO_PREFIX:1faeb708-a360-4118-a5a9-d7c44ee6b20e,JMS_AMQP_HEADERPRIORITY=true,JMS_AMQP_ORIGINAL_ENCODING=2,JMS_AMQP_HEADERDURABLE=true,JMS_AMQP_HEADER=true,JMS_AMQP_ContentType=application/json]]
Generates java.lang.ClassCastException: class java.lang.String cannot be cast to class io.vertx.core.json.JsonObject
.
If I send the message as a JSON object instead of a String I have another class cast exception: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class io.vertx.core.json.JsonObject
.
const message: Message = {
content_type: "application/json",
content_encoding: "utf-8",
durable: true,
body: JSON.stringify(user)
};
const connection: Connection = new Connection(this.connectionOptions);
await connection.open();
const sender: Sender = await connection.createSender(senderOptions);
const delivery: Delivery = sender.send(message);
await sender.close();
await connection.close();
artemis_1 | [686835641:0] <- SASL
artemis_1 | [686835641:0] -> SASL
artemis_1 | [686835641:0] -> SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
artemis_1 | [686835641:0] <- SaslInit{mechanism=PLAIN, initialResponse=\x00quarkus\x00quarkus, hostname='localhost'}
artemis_1 | [686835641:0] -> SaslOutcome{_code=OK, _additionalData=null}
artemis_1 | [686835641:0] -> AMQP
artemis_1 | [686835641:0] <- AMQP
artemis_1 | [686835641:0] <- Open{ containerId='14d58bbe-140e-164d-a726-f7e49a54a60c', hostname='null', maxFrameSize=4294967295, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [686835641:0] -> Open{ containerId='44e0c3a3086f', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.14.0}}
artemis_1 | [686835641:0] <- Begin{remoteChannel=null, nextOutgoingId=0, incomingWindow=2048, outgoingWindow=4294967295, handleMax=4294967295, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [686835641:0] -> Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [686835641:0] <- Attach{name='a0ae3e10-6b1e-3b4f-846a-33dd2e491f27', handle=0, role=SENDER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='POCUSRU001.DOC.USER.PP.V1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [686835641:0] -> Attach{name='a0ae3e10-6b1e-3b4f-846a-33dd2e491f27', handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='POCUSRU001.DOC.USER.PP.V1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [686835641:0] -> Flow{nextIncomingId=0, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
artemis_1 | [686835641:0] <- Transfer{handle=0, deliveryId=0, deliveryTag=0, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (215) "\x00Sp\xd0\x00\x00\x00\x05\x00\x00\x00\x01A\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x08@@@@@@\xa3\x10application/json\xa3\x05utf-8\x00Sw\xa1\x9a{"orders":[],"name":"Lo\xc3\xafc Mathieu 2","birthDate":"2000-01-02","email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","id":"5f575d6a55cdc98ef483bd53"}"
artemis_1 | [686835641:0] <- Detach{handle=0, closed=true, error=null}
artemis_1 | [686835641:0] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=null, batchable=false}
artemis_1 | [686835641:0] -> Detach{handle=0, closed=true, error=null}
artemis_1 | [686835641:0] <- End{error=null}
artemis_1 | [686835641:0] -> End{error=null}
artemis_1 | [686835641:0] <- Close{error=null}
artemis_1 | [686835641:0] -> Close{error=null}
artemis_1 | 2020-09-08 10:31:06,133 WARN [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session 66ee12d7-f1be-11ea-8a12-0242ac120004
artemis_1 | 2020-09-08 10:31:06,133 WARN [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 66ee12d7-f1be-11ea-8a12-0242ac120004
artemis_1 | [1888701975:0] -> Transfer{handle=0, deliveryId=2, deliveryTag=\x01, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (215) "\x00Sp\xd0\x00\x00\x00\x05\x00\x00\x00\x01A\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x08@@@@@@\xa3\x10application/json\xa3\x05utf-8\x00Sw\xa1\x9a{"orders":[],"name":"Lo\xc3\xafc Mathieu 2","birthDate":"2000-01-02","email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","id":"5f575d6a55cdc98ef483bd53"}"
artemis_1 | [1888701975:0] <- Empty Frame
artemis_1 | [2083097974:0] <- Empty Frame
ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=232, durable=true, address=POCUSRU001.DOC.USER.PP.V1,userID=null,properties=TypedProperties[JMS_AMQP_ORIGINAL_ENCODING=5,JMS_AMQP_ContentEncoding=utf-8,JMS_AMQP_HEADERDURABLE=true,JMS_AMQP_HEADER=true,JMS_AMQP_ContentType=application/json]]
Generates java.lang.ClassCastException: class java.lang.String cannot be cast to class io.vertx.core.json.JsonObject
.
@Autowired private JmsTemplate jmsTemplate;
@PostMapping
public ResponseEntity create(@RequestBody User user) {
// persist the user to the database
userRepository.save(user);
// send a doc message with the user
System.out.println("Sending a user doc message");
jmsTemplate.send("POCUSRU001.DOC.USER.PP.V1", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(new Gson().toJson(user));
msg.setJMSDeliveryMode(2);
return msg;
}
});
URI uri = URI.create("/users/" + user.id.toString());
return ResponseEntity.created(uri).build();
}
This log is obtained only when launching an AMQP message consumer
artemis_1 | [1061124102:0] <- SASL
artemis_1 | [1726943890:0] <- SASL
artemis_1 | [1061124102:0] -> SASL
artemis_1 | [1726943890:0] -> SASL
artemis_1 | [1061124102:0] -> SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
artemis_1 | [1726943890:0] -> SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
artemis_1 | [1726943890:0] <- SaslInit{mechanism=PLAIN, initialResponse=\x00quarkus\x00quarkus, hostname='null'}
artemis_1 | [1061124102:0] <- SaslInit{mechanism=PLAIN, initialResponse=\x00quarkus\x00quarkus, hostname='null'}
artemis_1 | [1726943890:0] -> SaslOutcome{_code=OK, _additionalData=null}
artemis_1 | [1061124102:0] -> SaslOutcome{_code=OK, _additionalData=null}
artemis_1 | [1061124102:0] -> AMQP
artemis_1 | [1726943890:0] -> AMQP
artemis_1 | [1061124102:0] <- AMQP
artemis_1 | [1726943890:0] <- AMQP
artemis_1 | [1061124102:0] <- Open{ containerId='vert.x-9e3aeafc-bb1e-4923-90ea-37b5d7364275', hostname='localhost', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.1}}
artemis_1 | [1726943890:0] <- Open{ containerId='vert.x-43491d82-a575-4213-a78a-b1810559f534', hostname='localhost', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.1}}
artemis_1 | [1726943890:0] -> Open{ containerId='44e0c3a3086f', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.14.0}}
artemis_1 | [1061124102:0] -> Open{ containerId='44e0c3a3086f', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.14.0}}
artemis_1 | [1726943890:0] <- Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [1061124102:0] <- Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [1061124102:0] <- Attach{name='orders-doc', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='POCORDU001.DOC.ORDER.PP.V1', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Released{}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [1726943890:0] <- Attach{name='users-doc', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='POCUSRU001.DOC.USER.PP.V1', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Released{}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [1726943890:0] -> Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [1726943890:0] -> Attach{name='users-doc', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='POCUSRU001.DOC.USER.PP.V1', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Released{}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1 | [1726943890:0] <- Flow{nextIncomingId=1, incomingWindow=65535, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
Unable to retrieve it yet