-
Create a separate namespace and set as default
$ kubectl create namespace rocketmq $ kubectl config set-context --current --namespace rocketmq
-
Install the RocketMQ's Operator into
rocketmq
namespace$ git clone https://github.com/apache/rocketmq-operator.git $ cd rocketmq-operator $ ./install-operator.sh $ kubectl get pods NAME READY STATUS RESTARTS AGE rocketmq-operator-867c4955-mnbnt 1/1 Running 0 26h
-
Install the RocketMQ's Name Service, Broker and Console
$ sed "/namespace: default\|storageClassName: rocketmq-storage/d; s/EmptyDir/StorageClass/g" example/rocketmq_v1alpha1_rocketmq_cluster.yaml | kubectl apply -f - $ kubectl get pods NAME READY STATUS RESTARTS AGE rocketmq-operator-867c4955-mnbnt 1/1 Running 0 28h console-747fb7988c-jtddj 1/1 Running 0 53m name-service-0 1/1 Running 0 53m broker-0-master-0 1/1 Running 0 53m broker-0-replica-1-0 1/1 Running 0 53m
Notes: the sed operation is aim to adjust the
namespace
andstorage class
-
Apply Service and visit the RocketMQ Console
$ kubectl apply -f example/rocketmq_v1alpha1_cluster_service.yaml
Goto
http://192.168.1.160:30000
you should see the RocketMQ Console
Notes: Should run under kubernetes master node
-
Prerequisite
- Maven
- Jdk 8
- Git
- JQ
-
Clone the RocketMQ and compile the distribution
$ git clone https://github.com/apache/rocketmq.git $ cd rocketmq $ mvn -Prelease-all -DskipTests clean install -U $ cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
-
Find the name server IP address
$ kubectl get pods name-service-0 -o json | jq '.status.podIP' "192.168.1.160"
-
Start the JMS Consumer and Producer
Run JMS Consumer
$ export NAMESRV_ADDR=192.168.1.160:9876 $ bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Run JMS Producer in new Console
$ export NAMESRV_ADDR=192.168.1.160:9876 $ bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
JMS Consumer Sample Output :
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=broker-0, queueId=3, storeSize=180, queueOffset=1249, sysFlag=0, bornTimestamp=1618301792292, bornHost=/192.168.1.160:41008, storeTimestamp=1618301792292, storeHost=/10.1.77.63:10911, msgId=0A014D3F00002A9F00000000000DB812, commitLogOffset=899090, bodyCRC=1359908749, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1250, CONSUME_START_TIME=1618301792294, UNIQ_KEY=7F000001548A2C7B84DE414A5C2403E6, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 56], transactionId='null'}]] ConsumeMessageThread_20 Receive New Messages: [MessageExt [brokerName=broker-0, queueId=0, storeSize=180, queueOffset=1249, sysFlag=0, bornTimestamp=1618301792293, bornHost=/192.168.1.160:41008, storeTimestamp=1618301792293, storeHost=/10.1.77.63:10911, msgId=0A014D3F00002A9F00000000000DB8C6, commitLogOffset=899270, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1250, CONSUME_START_TIME=1618301792294, UNIQ_KEY=7F000001548A2C7B84DE414A5C2503E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]]
Sample Producer Sample Output :
SendResult [sendStatus=SEND_OK, msgId=7F000001548A2C7B84DE414A5C2403E6, offsetMsgId=0A014D3F00002A9F00000000000DB812, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-0, queueId=3], queueOffset=1249] SendResult [sendStatus=SEND_OK, msgId=7F000001548A2C7B84DE414A5C2503E7, offsetMsgId=0A014D3F00002A9F00000000000DB8C6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-0, queueId=0], queueOffset=1249]