Skip to content

Instantly share code, notes, and snippets.

@loicmathieu
Last active September 8, 2020 11:03
Show Gist options
  • Save loicmathieu/d264388ae1e6d4a0325ed58f213a9219 to your computer and use it in GitHub Desktop.
Save loicmathieu/d264388ae1e6d4a0325ed58f213a9219 to your computer and use it in GitHub Desktop.
Issue AMQP / JMS between Reactive Messaging, NodeJS rhea and Spring JmsTemplate

Information for trying to interop between AMQP reactive messaging, NodeJS AMQP rhea and Spring Artemis JmsTemplate.

Logs are retrieved when only the consumer and one producer is running.

Messages are retrieved from JMSToolbox on separate run to avoid possible mismatch.

The message consumer

It is a reactive messaging based message consumer (Quarkus application)

Code

@Incoming("orders-doc")
    public void indexOrder(JsonObject jsonMsg) throws IOException {
        System.out.println(jsonMsg);

        UMessage<Order> message = jsonMsg.mapTo(UMessage.class);
        System.out.println("UUID: " + message.header.uuid);
        Order order = message.bodyAs(Order.class);
        User user = searchService.getUser(order.userId);
        user.orders.add(order);
        searchService.indexUser(user);
    }

Message send via reactive messaging

Code

    @Inject @Channel("users-doc") Emitter<User> userDocEmitter;

    @POST
    public Response create(User user) {
        // persist the user to the database
        user.persist();

        // send a doc message with the user
        OutgoingAmqpMetadata metadata = OutgoingAmqpMetadata.builder()
                .withId(UUID.randomUUID().toString())
                .build();
        userDocEmitter.send(Message.of(user).addMetadata(metadata));

        URI uri = URI.create("/users/" + user.id.toString());
        return Response.created(uri).build();
    }

Artemis logs

artemis_1        | [1565387426:0] <- Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x00\x00\x02, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (265) "\x00Sp\xc0\x04\x02AP\x00\x00Ss\xd0\x00\x00\x00[\x00\x00\x00\x07\xa1$a0cfe1e0-d7d6-408b-96dc-212be4a94b4f@\xa1\x19POCUSRU001.DOC.USER.PP.V1@@@\xa3\x10application/json\x00Su\xa0\x98{"id":"5f575d2361571c1016178ab4","name":"Lo\xc3\xafc Mathieu 2","birthDate":[2000,1,2],"email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","orders":[]}"
artemis_1        | [1565387426:0] -> Disposition{role=RECEIVER, first=1, last=1, settled=true, state=Accepted{}, batchable=false}
artemis_1        | [1888701975:0] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (265) "\x00Sp\xc0\x04\x02AP\x00\x00Ss\xd0\x00\x00\x00[\x00\x00\x00\x07\xa1$a0cfe1e0-d7d6-408b-96dc-212be4a94b4f@\xa1\x19POCUSRU001.DOC.USER.PP.V1@@@\xa3\x10application/json\x00Su\xa0\x98{"id":"5f575d2361571c1016178ab4","name":"Lo\xc3\xafc Mathieu 2","birthDate":[2000,1,2],"email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","orders":[]}"

Message seen from JMSToolbox

ActiveMQMessage[ID:0a4ad635-f1c1-11ea-a838-0242ac140004]:PERSISTENT/ClientMessageImpl[messageID=208, durable=true, address=POCUSRU001.DOC.USER.PP.V1,userID=0a4ad635-f1c1-11ea-a838-0242ac140004,properties=TypedProperties[NATIVE_MESSAGE_ID=ID:AMQP_NO_PREFIX:1faeb708-a360-4118-a5a9-d7c44ee6b20e,JMS_AMQP_HEADERPRIORITY=true,JMS_AMQP_ORIGINAL_ENCODING=2,JMS_AMQP_HEADERDURABLE=true,JMS_AMQP_HEADER=true,JMS_AMQP_ContentType=application/json]]

Message send via NodeJS rhea (AMQP)

Generates java.lang.ClassCastException: class java.lang.String cannot be cast to class io.vertx.core.json.JsonObject.

If I send the message as a JSON object instead of a String I have another class cast exception: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class io.vertx.core.json.JsonObject.

Code

        const message: Message = {
            content_type: "application/json",
            content_encoding: "utf-8",
            durable: true,
            body: JSON.stringify(user)
        };
        const connection: Connection = new Connection(this.connectionOptions);
        await connection.open();
        const sender: Sender = await connection.createSender(senderOptions);
        const delivery: Delivery = sender.send(message);

        await sender.close();
        await connection.close();

Artemis logs

artemis_1        | [686835641:0] <- SASL
artemis_1        | [686835641:0] -> SASL
artemis_1        | [686835641:0] -> SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
artemis_1        | [686835641:0] <- SaslInit{mechanism=PLAIN, initialResponse=\x00quarkus\x00quarkus, hostname='localhost'}
artemis_1        | [686835641:0] -> SaslOutcome{_code=OK, _additionalData=null}
artemis_1        | [686835641:0] -> AMQP
artemis_1        | [686835641:0] <- AMQP
artemis_1        | [686835641:0] <- Open{ containerId='14d58bbe-140e-164d-a726-f7e49a54a60c', hostname='null', maxFrameSize=4294967295, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [686835641:0] -> Open{ containerId='44e0c3a3086f', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.14.0}}
artemis_1        | [686835641:0] <- Begin{remoteChannel=null, nextOutgoingId=0, incomingWindow=2048, outgoingWindow=4294967295, handleMax=4294967295, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [686835641:0] -> Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [686835641:0] <- Attach{name='a0ae3e10-6b1e-3b4f-846a-33dd2e491f27', handle=0, role=SENDER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='POCUSRU001.DOC.USER.PP.V1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [686835641:0] -> Attach{name='a0ae3e10-6b1e-3b4f-846a-33dd2e491f27', handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}, target=Target{address='POCUSRU001.DOC.USER.PP.V1', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [686835641:0] -> Flow{nextIncomingId=0, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
artemis_1        | [686835641:0] <- Transfer{handle=0, deliveryId=0, deliveryTag=0, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (215) "\x00Sp\xd0\x00\x00\x00\x05\x00\x00\x00\x01A\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x08@@@@@@\xa3\x10application/json\xa3\x05utf-8\x00Sw\xa1\x9a{"orders":[],"name":"Lo\xc3\xafc Mathieu 2","birthDate":"2000-01-02","email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","id":"5f575d6a55cdc98ef483bd53"}"
artemis_1        | [686835641:0] <- Detach{handle=0, closed=true, error=null}
artemis_1        | [686835641:0] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=null, batchable=false}
artemis_1        | [686835641:0] -> Detach{handle=0, closed=true, error=null}
artemis_1        | [686835641:0] <- End{error=null}
artemis_1        | [686835641:0] -> End{error=null}
artemis_1        | [686835641:0] <- Close{error=null}
artemis_1        | [686835641:0] -> Close{error=null}
artemis_1        | 2020-09-08 10:31:06,133 WARN  [org.apache.activemq.artemis.core.server] AMQ222061: Client connection failed, clearing up resources for session 66ee12d7-f1be-11ea-8a12-0242ac120004
artemis_1        | 2020-09-08 10:31:06,133 WARN  [org.apache.activemq.artemis.core.server] AMQ222107: Cleared up resources for session 66ee12d7-f1be-11ea-8a12-0242ac120004
artemis_1        | [1888701975:0] -> Transfer{handle=0, deliveryId=2, deliveryTag=\x01, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (215) "\x00Sp\xd0\x00\x00\x00\x05\x00\x00\x00\x01A\x00Ss\xd0\x00\x00\x00#\x00\x00\x00\x08@@@@@@\xa3\x10application/json\xa3\x05utf-8\x00Sw\xa1\x9a{"orders":[],"name":"Lo\xc3\xafc Mathieu 2","birthDate":"2000-01-02","email":"[email protected]","address":"2 rue du Paradis \xc3\xa0 Nantes","id":"5f575d6a55cdc98ef483bd53"}"
artemis_1        | [1888701975:0] <- Empty Frame
artemis_1        | [2083097974:0] <- Empty Frame

Message seen from JMSToolbox

ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=232, durable=true, address=POCUSRU001.DOC.USER.PP.V1,userID=null,properties=TypedProperties[JMS_AMQP_ORIGINAL_ENCODING=5,JMS_AMQP_ContentEncoding=utf-8,JMS_AMQP_HEADERDURABLE=true,JMS_AMQP_HEADER=true,JMS_AMQP_ContentType=application/json]]

Message send via Spring Artemis JMS

Generates java.lang.ClassCastException: class java.lang.String cannot be cast to class io.vertx.core.json.JsonObject.

Code

    @Autowired private JmsTemplate jmsTemplate;

    @PostMapping
    public ResponseEntity create(@RequestBody User user) {
        // persist the user to the database
        userRepository.save(user);

        // send a doc message with the user
        System.out.println("Sending a user doc message");
        jmsTemplate.send("POCUSRU001.DOC.USER.PP.V1", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                Message msg = session.createTextMessage(new Gson().toJson(user));
                msg.setJMSDeliveryMode(2);
                return msg;
            }
        });

        URI uri = URI.create("/users/" + user.id.toString());
        return ResponseEntity.created(uri).build();
    }

Artemis logs

This log is obtained only when launching an AMQP message consumer

artemis_1        | [1061124102:0] <- SASL
artemis_1        | [1726943890:0] <- SASL
artemis_1        | [1061124102:0] -> SASL
artemis_1        | [1726943890:0] -> SASL
artemis_1        | [1061124102:0] -> SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
artemis_1        | [1726943890:0] -> SaslMechanisms{saslServerMechanisms=[PLAIN, ANONYMOUS]}
artemis_1        | [1726943890:0] <- SaslInit{mechanism=PLAIN, initialResponse=\x00quarkus\x00quarkus, hostname='null'}
artemis_1        | [1061124102:0] <- SaslInit{mechanism=PLAIN, initialResponse=\x00quarkus\x00quarkus, hostname='null'}
artemis_1        | [1726943890:0] -> SaslOutcome{_code=OK, _additionalData=null}
artemis_1        | [1061124102:0] -> SaslOutcome{_code=OK, _additionalData=null}
artemis_1        | [1061124102:0] -> AMQP
artemis_1        | [1726943890:0] -> AMQP
artemis_1        | [1061124102:0] <- AMQP
artemis_1        | [1726943890:0] <- AMQP
artemis_1        | [1061124102:0] <- Open{ containerId='vert.x-9e3aeafc-bb1e-4923-90ea-37b5d7364275', hostname='localhost', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.1}}
artemis_1        | [1726943890:0] <- Open{ containerId='vert.x-43491d82-a575-4213-a78a-b1810559f534', hostname='localhost', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.1}}
artemis_1        | [1726943890:0] -> Open{ containerId='44e0c3a3086f', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.14.0}}
artemis_1        | [1061124102:0] -> Open{ containerId='44e0c3a3086f', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.14.0}}
artemis_1        | [1726943890:0] <- Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [1061124102:0] <- Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [1061124102:0] <- Attach{name='orders-doc', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='POCORDU001.DOC.ORDER.PP.V1', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Released{}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [1726943890:0] <- Attach{name='users-doc', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='POCUSRU001.DOC.USER.PP.V1', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Released{}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [1726943890:0] -> Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [1726943890:0] -> Attach{name='users-doc', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='POCUSRU001.DOC.USER.PP.V1', durable=UNSETTLED_STATE, expiryPolicy=NEVER, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=Released{}, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
artemis_1        | [1726943890:0] <- Flow{nextIncomingId=1, incomingWindow=65535, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}

Message seen from JMSToolbox

Unable to retrieve it yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment