AWS SQS Connector
The AWS SQS connector provides Akka Stream sources and sinks for AWS SQS queues.
For more information about AWS SQS please visit the official documentation.
Artifacts
- sbt
-
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.9"
- Maven
-
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-sqs_2.12</artifactId> <version>0.9</version> </dependency>
- Gradle
-
dependencies { compile group: "com.lightbend.akka", name: "akka-stream-alpakka-sqs_2.12", version: "0.9" }
Usage
Sources, Flows and Sinks provided by this connector need a prepared AmazonSQSAsync
to load messages from a queue.
- Scala
-
val client: AmazonSQSAsync = AmazonSQSAsyncClientBuilder .standard() .withCredentials(credentialsProvider) .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build()
- Java
-
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build();
We will also need an ActorSystem and an ActorMaterializer.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
system = ActorSystem.create(); materializer = ActorMaterializer.create(system);
This is all preparation that we are going to need.
Stream messages from a SQS queue
Now we can stream AWS Java SDK SQS Message
objects from any SQS queue where we have access to by providing the queue URL to the SqsSource factory.
- Scala
-
SqsSource(queue, sqsSourceSettings).take(100).runWith(Sink.seq).map(_ should have size 100)
- Java
-
final CompletionStage<String> cs = SqsSource.create(queueUrl, sqsSourceSettings, sqsClient) .map(m -> m.getBody()) .runWith(Sink.head(), materializer); final CompletionStage<String> cs = SqsSource.create(queueUrl, sqsSourceSettings, customSqsClient) .map(m -> m.getBody()) .take(1) .runWith(Sink.head(), materializer);
As you have seen we take the first 100 elements from the stream. The reason for this is, that reading messages from SQS queues never finishes because there is no direct way to determine the end of a queue.
Source configuration
- Scala
-
final case class SqsSourceSettings( waitTimeSeconds: Int, maxBufferSize: Int, maxBatchSize: Int, attributeNames: Seq[AttributeName] = Seq(), messageAttributeNames: Seq[MessageAttributeName] = Seq() ) { require(maxBatchSize <= maxBufferSize, "maxBatchSize must be lower or equal than maxBufferSize") // SQS requirements require(0 <= waitTimeSeconds && waitTimeSeconds <= 20, s"Invalid value ($waitTimeSeconds) for waitTimeSeconds. Requirement: 0 <= waitTimeSeconds <= 20 ") require(1 <= maxBatchSize && maxBatchSize <= 10, s"Invalid value ($maxBatchSize) for maxBatchSize. Requirement: 1 <= maxBatchSize <= 10 ") }
Options:
maxBatchSize
- the maximum number of messages to return (seeMaxNumberOfMessages
in AWS docs). Default: 10maxBufferSize
- internal buffer size used by theSource
. Default: 100 messageswaitTimeSeconds
- the duration for which the call waits for a message to arrive in the queue before returning (seeWaitTimeSeconds
in AWS docs). Default: 20 seconds
Be aware that the SqsSource
runs multiple requests to Amazon SQS in parallel. The maximum number of concurrent requests is limited by parallelism = maxBufferSize / maxBatchSize
. E.g.: By default maxBatchSize
is set to 10 and maxBufferSize
is set to 100 so at the maximum, SqsSource
will run 10 concurrent requests to Amazon SQS. AmazonSQSAsyncClient
uses a fixed thread pool with 50 threads by default. To tune the thread pool used by AmazonSQSAsyncClient
you can supply a custom ExecutorService
on client creation.
- Scala
-
val customSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder .standard() .withCredentials(credentialsProvider) .withExecutorFactory(new ExecutorFactory { override def newExecutor() = Executors.newFixedThreadPool(10) }) .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build()
- Java
-
AmazonSQSAsync customSqsClient = AmazonSQSAsyncClientBuilder .standard() .withCredentials(credentialsProvider) .withExecutorFactory(() -> Executors.newFixedThreadPool(10)) .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build();
Please make sure to configure a big enough thread pool to avoid resource starvation. This is especially important if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism
(Source) and maxInFlight
(Sink) must be less than or equal to the thread pool size.
Stream messages to a SQS queue
Create a sink, that forwards String
to the SQS queue.
- Scala
-
val future = Source.single("alpakka").runWith(SqsSink(queue)) Await.ready(future, 1.second)
- Java
-
CompletionStage<Done> done = Source .single("alpakka") .runWith(SqsSink.create(queueUrl, sqsClient), materializer); done.toCompletableFuture().get(1, TimeUnit.SECONDS);
Sink configuration
- Scala
-
final case class SqsSinkSettings(maxInFlight: Int) { require(maxInFlight > 0) }
Options:
maxInFlight
- maximum number of messages being processed byAmazonSQSAsync
at the same time. Default: 10
Message processing with acknowledgement
SqsAckSink
provides possibility to acknowledge (delete) or requeue a message.
Your flow must decide which action to take and push it with message:
Ack
- delete message from the queueRequeueWithDelay(delaySeconds: Int)
- schedule a retry
- Scala (ack)
-
val future = SqsSource(queue)(awsSqsClient) .take(1) .map { m: Message => (m, Ack()) } .runWith(SqsAckSink(queue)(awsSqsClient))
- Scala (requeue)
-
val future = SqsSource(queue)(awsSqsClient) .take(1) .map { m: Message => (m, RequeueWithDelay(5)) } .runWith(SqsAckSink(queue)(awsSqsClient))
- Java (ack)
-
Tuple2<Message, MessageAction> pair = new Tuple2<>( new Message().withBody("test"), new Ack() ); CompletionStage<Done> done = Source .single(pair) .runWith(SqsAckSink.create(queueUrl, awsClient), materializer); done.toCompletableFuture().get(1, TimeUnit.SECONDS);
- Java (requeue)
-
Tuple2<Message, MessageAction> pair = new Tuple2<>( new Message().withBody("test"), new RequeueWithDelay(12) ); CompletionStage<Done> done = Source .single(pair) .runWith(SqsAckSink.create(queueUrl, awsClient), materializer); done.toCompletableFuture().get(1, TimeUnit.SECONDS);
SqsAckSink configuration
Same as the normal SqsSink
:
- Scala
-
final case class SqsAckSinkSettings(maxInFlight: Int) { require(maxInFlight > 0) }
Options:
maxInFlight
- maximum number of messages being processed byAmazonSQSAsync
at the same time. Default: 10
Using SQS as a Flow
You can also build flow stages which put or acknowledge messages in SQS, backpressure on queue response and then forward responses further down the stream. The API is similar to creating Sinks.
- Scala (flow)
-
val future = Source.single("alpakka").via(SqsFlow(queue)).runWith(Sink.ignore)
- Java (flow)
-
CompletionStage<Done> done = Source .single("alpakka-flow") .via(SqsFlow.create(queueUrl, sqsClient)) .runWith(Sink.ignore(), materializer); done.toCompletableFuture().get(1, TimeUnit.SECONDS); CompletionStage<Done> done = Source .single("alpakka-flow") .via(SqsFlow.create(queueUrl, sqsClient)) .runWith(Sink.ignore(), materializer); done.toCompletableFuture().get(1, TimeUnit.SECONDS);
- Scala (flow with ack)
-
val future = SqsSource(queue)(awsSqsClient) .take(1) .map { m: Message => (m, Ack()) } .via(SqsAckFlow(queue)(awsSqsClient)) .runWith(Sink.ignore)
- Java (flow with ack)
-
Tuple2<Message, MessageAction> pair = new Tuple2<>( new Message().withBody("test-ack-flow"), new Ack() ); CompletionStage<Done> done = Source .single(pair) .via(SqsAckFlow.create(queueUrl, awsClient)) .runWith(Sink.ignore(), materializer); done.toCompletableFuture().get(1, TimeUnit.SECONDS);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
The test code uses embedded ElasticMQ as queuing service which serves an AWS SQS compatible API.
- Scala
-
sbt > sqs/testOnly *.SqsSourceSpec
- Java
-
sbt > sqs/testOnly *.SqsSourceTest