Last active
April 2, 2023 10:14
-
-
Save dacr/2be9cbe9eb38c8a893008a9197cf20e2 to your computer and use it in GitHub Desktop.
learning rabbitmq through java api. / published by https://github.com/dacr/code-examples-manager #93f22a3a-e4e4-4f16-96a0-b78f3d7ae916/3aaab1e7b068aa9eff811df539670fd7c745592
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : learning rabbitmq through java api. | |
// keywords : scala, scalatest, fixture, rabbitmq, rabbitmq-java-api, learning, docker, async, @testable | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 93f22a3a-e4e4-4f16-96a0-b78f3d7ae916 | |
// created-on : 2020-08-14T20:16:33Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
// Inspired from https://www.rabbitmq.com/api-guide.html | |
import $ivy.`org.scalatest::scalatest:3.2.6` | |
import $ivy.`com.whisk::docker-testkit-impl-spotify:0.9.9` | |
import $ivy.`org.json4s::json4s-jackson:3.6.10` | |
import $ivy.`com.rabbitmq:amqp-client:5.9.0` | |
import $ivy.`org.slf4j:slf4j-nop:1.7.30` | |
import $ivy.`javax.activation:activation:1.1.1` | |
import org.scalatest._ | |
import flatspec._ | |
import matchers._ | |
import OptionValues._ | |
import com.whisk.docker._ | |
import org.json4s.JValue | |
import org.json4s.jackson.Serialization | |
import com.rabbitmq.client._ | |
import scala.concurrent.{Future, Promise} | |
import scala.concurrent.duration._ | |
import scala.util.{Failure, Success, Try, Using} | |
// ================================================================================================== | |
// Provided DockerTestKit allow to have control over the used scalatest release | |
trait DockerTestKit extends BeforeAndAfterAll with org.scalatest.concurrent.ScalaFutures with DockerKit { | |
self: Suite => | |
import org.scalatest.time.{Span, Seconds, Millis} | |
def dockerInitPatienceInterval = | |
PatienceConfig(scaled(Span(20, Seconds)), scaled(Span(10, Millis))) | |
def dockerPullImagesPatienceInterval = | |
PatienceConfig(scaled(Span(1200, Seconds)), scaled(Span(250, Millis))) | |
override def beforeAll(): Unit = { | |
super.beforeAll(); | |
startAllOrFail() | |
} | |
override def afterAll(): Unit = { | |
stopAllQuietly(); | |
super.afterAll() | |
} | |
} | |
// ================================================================================================== | |
// docker run -it --rm -p 5672:5672 -e "RABBITMQ_DEFAULT_USER=rabbit" -e "RABBITMQ_DEFAULT_PASS=bunny" -e "RABBITMQ_DEFAULT_VHOST=my-vhost" rabbitmq:3.8.6 | |
trait DockerRabbitService extends com.whisk.docker.impl.spotify.DockerKitSpotify { | |
val rabbitPort = 5672 | |
val rabbitUser = "rabbit" | |
val rabbitPass = "bunny" | |
val rabbitVirtualHost = "my-vhost" | |
val rabbitHostname = "my-rabbit" | |
val env = Array( | |
s"RABBITMQ_DEFAULT_USER=$rabbitUser", | |
s"RABBITMQ_DEFAULT_PASS=$rabbitPass", | |
s"RABBITMQ_DEFAULT_VHOST=$rabbitVirtualHost", | |
//"RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit auth_mechanisms ['PLAIN', 'AMQPLAIN']", | |
) | |
lazy val rabbitContainer = DockerContainer("rabbitmq:3.8.6") | |
.withPortMapping(5672 -> DockerPortMapping(Some(rabbitPort), "127.0.0.1")) | |
.withHostname(rabbitHostname) | |
.withEnv(env: _*) // take care .withEnv overwrites any previous .withEnv ... in the current release | |
.withReadyChecker(DockerReadyChecker.LogLineContains("Server startup complete")) | |
override def dockerContainers: List[DockerContainer] = rabbitContainer :: Nil | |
} | |
// ================================================================================================== | |
class LearningRabbitmqThroughTests extends FixtureAsyncFlatSpec with should.Matchers with DockerRabbitService with DockerTestKit { | |
override def suiteName = "LearningRabbitmqThroughTests" | |
implicit val serialization = org.json4s.jackson.Serialization | |
implicit val formats = org.json4s.DefaultFormats | |
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global | |
lazy val connectionFactory = { | |
val factory = new ConnectionFactory() | |
factory.setUsername(rabbitUser) | |
factory.setPassword(rabbitPass) | |
factory.setVirtualHost(rabbitVirtualHost) | |
factory.setHost("localhost") | |
factory.setPort(rabbitPort) | |
factory | |
} | |
case class FixtureParam(createChannel: () => Channel) | |
def withFixture(test: OneArgAsyncTest) = { | |
val connection = connectionFactory.newConnection() | |
var channels = List.empty[Channel] | |
val channelFactory = () => { | |
val channel = connection.createChannel | |
channels ::= channel | |
channel | |
} | |
val theFixture = FixtureParam(channelFactory) | |
val futureOutcome = withFixture(test.toNoArgAsyncTest(theFixture)) // "loan" the fixture to the test | |
futureOutcome.onCompletedThen { _ => | |
channels.foreach(_.close()) | |
connection.close() | |
} | |
futureOutcome | |
} | |
// -------------------------------------------------------------------------------------------------- | |
"rabbitmq" should "be ready" in { fixture => | |
info("Take care rabbitmq channel are not thread safe") | |
val channel = fixture.createChannel() | |
channel.isOpen shouldBe true | |
} | |
// -------------------------------------------------------------------------------------------------- | |
it should "be possible to create a default server-named queue" in { fixture => | |
note("queueDeclare() creates a server-named exclusive, autodelete, non-durable queue") | |
note("This queue can only by used by the current connection") | |
note("This queue is automatically deleted as soon as it is not longer in use") | |
note("This queue will be automatically deleted on server stops") | |
val channel = fixture.createChannel() | |
val state = channel.queueDeclare() | |
val queueName = state.getQueue | |
note(s"A queue named $queueName has been created, it will be automatically deleted on connection close") | |
succeed | |
} | |
// -------------------------------------------------------------------------------------------------- | |
it should "be possible to create/delete a client-named durable queue" in { fixture => | |
val channel = fixture.createChannel() | |
val queueName = "MyQueue" | |
try { | |
channel.queueDeclare(queueName, true, false, false, null) | |
note("durable : true if we are declaring a durable queue (the queue will survive a server restart)") | |
note("exclusive : true if we are declaring an exclusive queue (restricted to this connection)") | |
note("autoDelete : true if we are declaring an autodelete queue (server will delete it when no longer in use)") | |
note(s"A queue named $queueName has been created") | |
succeed | |
} finally { | |
channel.queueDelete(queueName) | |
} | |
} | |
// -------------------------------------------------------------------------------------------------- | |
it should "be possible to do basic publish/consume operations" in { fixture => | |
val promise = Promise[String]() | |
note("queueDeclare dynamically create a queue whose name (see getQueue) is automatically generated") | |
val consumerChannel = fixture.createChannel() | |
val queueName = consumerChannel.queueDeclare().getQueue | |
val consumer: DeliverCallback = (tag, delivery) => promise.success(new String(delivery.getBody)) | |
consumerChannel.basicConsume(queueName, true, consumer, (tag: String) => {}) | |
note("For very basic publishing, the routing key is the queueName, and this is only what you need") | |
val publisherChannel = fixture.createChannel() | |
publisherChannel.basicPublish("", queueName, null, "hello".getBytes) | |
promise.future.map(result => result shouldBe "hello") | |
} | |
// -------------------------------------------------------------------------------------------------- | |
it should "be possible to broadcast messages" in { fixture => | |
val promiseA = Promise[String]() | |
val promiseB = Promise[String]() | |
note("To broadcast just create a dedicated exchange with fanout type and bind queues to it") | |
note("Here as everything is going through the same connection, we can use default queueDeclare operation") | |
val adminChannel = fixture.createChannel() | |
val queueNameA = adminChannel.queueDeclare().getQueue | |
val queueNameB = adminChannel.queueDeclare().getQueue | |
adminChannel.exchangeDeclare("broadcast", "fanout") | |
adminChannel.queueBind(queueNameA, "broadcast", "") | |
adminChannel.queueBind(queueNameB, "broadcast", "") | |
note("java client rabbitmq channels only requires names for queues or exchanges") | |
val consumer: Promise[String] => DeliverCallback = | |
promise => (tag, delivery) => promise.success(new String(delivery.getBody)) | |
val consumerChannelA = fixture.createChannel() | |
consumerChannelA.basicConsume(queueNameA, true, consumer(promiseA), (tag: String) => {}) | |
val consumerChannelB = fixture.createChannel() | |
consumerChannelB.basicConsume(queueNameB, true, consumer(promiseB), (tag: String) => {}) | |
val publisherChannel = fixture.createChannel() | |
publisherChannel.basicPublish("broadcast", "", null, "ok".getBytes) | |
Future.sequence(List(promiseA, promiseB).map(_.future)).map { results => | |
results shouldBe List("ok", "ok") | |
} | |
} | |
} | |
org.scalatest.tools.Runner.main(Array("-oDF", "-s", classOf[LearningRabbitmqThroughTests].getName)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment