AMQP Connector
The AMQP connector provides Akka Stream sources and sinks to connect to AMQP servers.
Artifacts
- sbt
-
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-amqp" % "0.9"
- Maven
-
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-amqp_2.12</artifactId> <version>0.9</version> </dependency>
- Gradle
-
dependencies { compile group: "com.lightbend.akka", name: "akka-stream-alpakka-amqp_2.12", version: "0.9" }
Usage
Sending messages to AMQP server
First define a queue name and the declaration of the queue that the messages will be sent to.
- Scala
-
val queueName = "amqp-conn-it-spec-simple-queue-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName)
- Java
-
final String queueName = "amqp-conn-it-spec-simple-queue-" + System.currentTimeMillis(); final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName);
Here we used QueueDeclaration configuration class to create a queue declaration. All of the configuration classes as well as connector factories are under the akka.stream.alpakka.amqp package.
Create a sink, that accepts and forwards ByteStrings to the AMQP server.
- Scala
-
val amqpSink = AmqpSink.simple( AmqpSinkSettings(connectionSettings).withRoutingKey(queueName).withDeclarations(queueDeclaration) )
- Java
-
final Sink<ByteString, CompletionStage<Done>> amqpSink = AmqpSink.createSimple( AmqpSinkSettings.create(amqpConnectionDetails) .withRoutingKey(queueName) .withDeclarations(queueDeclaration) );
AmqpSink is a collection of factory methods that facilitates creation of sinks. Here we created a simple sink, which means that we are able to pass ByteString
s to the sink instead of wrapping data into OutgoingMessages.
Last step is to materialize and run the sink we have created.
- Scala
-
val input = Vector("one", "two", "three", "four", "five") Source(input).map(s => ByteString(s)).runWith(amqpSink)
- Java
-
final List<String> input = Arrays.asList("one", "two", "three", "four", "five"); Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);
Receiving messages from AMQP server
Create a source using the same queue declaration as before.
- Scala
-
val amqpSource = AmqpSource( NamedQueueSourceSettings(connectionSettings, queueName).withDeclarations(queueDeclaration), bufferSize = 10 )
- Java
-
final Integer bufferSize = 10; final Source<IncomingMessage, NotUsed> amqpSource = AmqpSource.create( NamedQueueSourceSettings.create( DefaultAmqpConnection.getInstance(), queueName ).withDeclarations(queueDeclaration), bufferSize );
The bufferSize
parameter controls the maximum number of messages to prefetch from the AMQP server.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val result = amqpSource.map(_.bytes.utf8String).take(input.size).runWith(Sink.seq)
- Java
-
final CompletionStage<List<String>> result = amqpSource.map(m -> m.bytes().utf8String()).take(input.size()).runWith(Sink.seq(), materializer);
This is how you send and receive message from AMQP server using this connector.
Using Pub/Sub with an AMQP server
Instead of sending messages directly to queues, it is possible to send messages to an exchange and then provide instructions to AMQP server what to do with incoming messages to the exchange. We are going to use the fanout type of the exchange, which enables message broadcasting to multiple consumers. We are going to do that by using an exchange declaration for the sink and all of the sources.
- Scala
-
val exchangeName = "amqp-conn-it-spec-pub-sub-" + System.currentTimeMillis() val exchangeDeclaration = ExchangeDeclaration(exchangeName, "fanout")
- Java
-
final String exchangeName = "amqp-conn-it-spec-pub-sub" + System.currentTimeMillis(); final ExchangeDeclaration exchangeDeclaration = ExchangeDeclaration.create(exchangeName, "fanout");
The sink for the exchange is created in a very similar way.
- Scala
-
val amqpSink = AmqpSink.simple( AmqpSinkSettings(DefaultAmqpConnection).withExchange(exchangeName).withDeclarations(exchangeDeclaration) )
- Java
-
final Sink<ByteString, CompletionStage<Done>> amqpSink = AmqpSink.createSimple( AmqpSinkSettings.create() .withExchange(exchangeName) .withDeclarations(exchangeDeclaration) );
For the source, we are going to create multiple sources and merge them using Akka Streams API.
- Scala
-
val fanoutSize = 4 val mergedSources = (0 until fanoutSize).foldLeft(Source.empty[(Int, String)]) { case (source, fanoutBranch) => source.merge( AmqpSource( TemporaryQueueSourceSettings( DefaultAmqpConnection, exchangeName ).withDeclarations(exchangeDeclaration), bufferSize = 1 ).map(msg => (fanoutBranch, msg.bytes.utf8String)) ) }
- Java
-
final Integer fanoutSize = 4; final Integer bufferSize = 1; Source<Pair<Integer, String>, NotUsed> mergedSources = Source.empty(); for (Integer i = 0; i < fanoutSize; i++) { final Integer fanoutBranch = i; mergedSources = mergedSources.merge( AmqpSource.create( TemporaryQueueSourceSettings.create( DefaultAmqpConnection.getInstance(), exchangeName ).withDeclarations(exchangeDeclaration), bufferSize ) .map(msg -> Pair.create(fanoutBranch, msg.bytes().utf8String())) ); }
We merge all sources into one and add the index of the source to all incoming messages, so we can distinguish which source the incoming message came from.
Such sink and source can be started the same way as in the previous example.
Using rabbitmq as an RPC mechanism
If you have remote workers that you want to incorporate into a stream, you can do it using rabbit RPC workflow RabbitMQ RPC
- Scala
-
val amqpRpcFlow = AmqpRpcFlow.simple( AmqpSinkSettings(DefaultAmqpConnection).withRoutingKey(queueName).withDeclarations(queueDeclaration) )
- Java
-
final Flow<ByteString,ByteString, CompletionStage<String>> ampqRpcFlow = AmqpRpcFlow.createSimple( AmqpSinkSettings.create().withRoutingKey(queueName).withDeclarations(queueDeclaration), 1);
- Scala
-
val (rpcQueueF, probe) = Source(input).map(s => ByteString(s)).viaMat(amqpRpcFlow)(Keep.right).toMat(TestSink.probe)(Keep.both).run
- Java
-
final List<String> input = Arrays.asList("one", "two", "three", "four", "five"); TestSubscriber.Probe<ByteString> probe = Source.from(input) .map(ByteString::fromString) .via(ampqRpcFlow) .runWith(TestSink.probe(system), materializer);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
Test code requires AMQP server running in the background. You can start one quickly using docker:
docker run --rm -p 5672:5672 rabbitmq:3
- Scala
-
sbt > amqp/testOnly *.AmqpConnectorsSpec
- Java
-
sbt > amqp/testOnly *.AmqpConnectorsTest