AWS Kinesis Connector
The AWS Kinesis connector provides an Akka Stream Source for consuming Kinesis Stream records.
For more information about Kinesis please visit the official documentation.
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" % "akka-stream-alpakka-kinesis2.12" % "0.15"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-kinesis2.12</artifactId> <version>0.15</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-kinesis2.12', version: '0.15' }
Usage
Sources and Flows provided by this connector need a AmazonKinesisAsync
instance to consume messages from a shard.
The AmazonKinesisAsync
instance you supply is thread-safe and can be shared amongst multiple GraphStages
. As a result, individual GraphStages
will not automatically shutdown the supplied client when they complete.
- Scala
-
val amazonKinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
- Java
-
final AmazonKinesisAsync amazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient();
We will also need an ActorSystem and an ActorMaterializer.
- Scala
-
implicit val system: ActorSystem = ActorSystem() implicit val materializer: Materializer = ActorMaterializer()
- Java
-
final ActorSystem system = ActorSystem.create(); final ActorMaterializer materializer = ActorMaterializer.create(system);
Using the Source
The KinesisSource
creates one GraphStage
per shard. Reading from a shard requires an instance of ShardSettings
.
- Scala
-
val settings = ShardSettings(streamName = "myStreamName", shardId = "shard-id", shardIteratorType = ShardIteratorType.TRIM_HORIZON, refreshInterval = 1.second, limit = 500)
- Java
-
final ShardSettings settings = ShardSettings.create("streamName", "shard-id", ShardIteratorType.LATEST, FiniteDuration.apply(1L, TimeUnit.SECONDS), 500);
You have the choice of reading from a single shard, or reading from multiple shards. In the case of multiple shards the results of running a separate GraphStage
for each shard will be merged together.
The GraphStage
associated with a shard will remain open until the graph is stopped, or a GetRecords result returns an empty shard iterator indicating that the shard has been closed. This means that if you wish to continue processing records after a merge or reshard, you will need to recreate the source with the results of a new DescribeStream request, which can be done by simply creating a new KinesisSource
. You can read more about adapting to a reshard here.
For a single shard you simply provide the settings for a single shard.
- Scala
-
KinesisSource.basic(settings, amazonKinesisAsync)
- Java
-
final Source<Record, NotUsed> single = KinesisSource.basic(settings, amazonKinesisAsync);
You can merge multiple shards by providing a list settings.
- Scala
-
val mergeSettings = List( ShardSettings("myStreamName", "shard-id-1", ShardIteratorType.AT_SEQUENCE_NUMBER, startingSequenceNumber = Some("sequence"), refreshInterval = 1.second, limit = 500), ShardSettings("myStreamName", "shard-id-2", ShardIteratorType.AT_TIMESTAMP, atTimestamp = Some(new Date()), refreshInterval = 1.second, limit = 500) ) KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)
- Java
-
final Source<Record, NotUsed> two = KinesisSource.basicMerge(Arrays.asList(settings), amazonKinesisAsync);
The constructed Source
will return Record
objects by calling GetRecords at the specified interval and according to the downstream demand.
Using the Put Flow/Sink
The KinesisFlow
(or KinesisSink
) publishes messages into a Kinesis stream using it’s partition key and message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.
Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.
Publishing to a Kinesis stream requires an instance of KinesisFlowSettings
, although a default instance with sane values and a method that returns settings based on the stream shard number are also available:
- Scala
-
val flowSettings = KinesisFlowSettings( parallelism = 1, maxBatchSize = 500, maxRecordsPerSecond = 1000, maxBytesPerSecond = 1000000, maxRetries = 5, backoffStrategy = KinesisFlowSettings.Exponential, retryInitialTimeout = 100 millis ) val defaultFlowSettings = KinesisFlowSettings.defaultInstance val fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4)
- Java
-
final KinesisFlowSettings flowSettings = KinesisFlowSettings.apply(1,500,1000,1000000,5, KinesisFlowSettings.exponential(), FiniteDuration.apply(100, TimeUnit.MILLISECONDS)); final KinesisFlowSettings defaultFlowSettings = KinesisFlowSettings.defaultInstance(); final KinesisFlowSettings fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4);
Note that throughput settings maxRecordsPerSecond
and maxBytesPerSecond
are vital to minimize server errors (like ProvisionedThroughputExceededException
) and retries, and thus achieve a higher publication rate.
The Flow/Sink can now be created.
- Scala
-
implicit val _: AmazonKinesisAsync = amazonKinesisAsync Source.empty[PutRecordsRequestEntry].via(KinesisFlow("myStreamName")).to(Sink.ignore) Source.empty[PutRecordsRequestEntry].via(KinesisFlow("myStreamName", flowSettings)).to(Sink.ignore) Source.empty[(String, ByteString)].via(KinesisFlow.byParititonAndBytes("myStreamName")).to(Sink.ignore) Source.empty[(String, ByteBuffer)].via(KinesisFlow.byPartitionAndData("myStreamName")).to(Sink.ignore) Source.empty[PutRecordsRequestEntry].to(KinesisSink("myStreamName")) Source.empty[PutRecordsRequestEntry].to(KinesisSink("myStreamName", flowSettings)) Source.empty[(String, ByteString)].to(KinesisSink.byParititonAndBytes("myStreamName")) Source.empty[(String, ByteBuffer)].to(KinesisSink.byPartitionAndData("myStreamName"))
- Java
-
final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> flow = KinesisFlow.apply("streamName", flowSettings, amazonKinesisAsync); final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> defaultSettingsFlow = KinesisFlow.apply("streamName", amazonKinesisAsync); final Sink<PutRecordsRequestEntry, NotUsed> sink = KinesisSink.apply("streamName", flowSettings, amazonKinesisAsync); final Sink<PutRecordsRequestEntry, NotUsed> defaultSettingsSink = KinesisSink.apply("streamName", amazonKinesisAsync);