RecordIO Framing
The codec parses a ByteString stream in the RecordIO format into distinct frames.
For instance, the response body:
128\n
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}20\n
{"type":"HEARTBEAT"}
is parsed into frames:
{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}
{"type":"HEARTBEAT"}
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-simple-codecs" % "0.15"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-simple-codecs_2.12</artifactId> <version>0.15</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-simple-codecs_2.12', version: '0.15' }
Usage
The helper object RecordIOFraming provides a scanner
factory method for a Flow[ByteString, ByteString, _]
which parses out RecordIO frames.
For instance, given the sample input:
- Scala
-
val FirstRecordData = """{"type": "SUBSCRIBED","subscribed": {"framework_id": {"value":"12220-3440-12532-2345"},"heartbeat_interval_seconds":15.0}""" val SecondRecordData = """{"type":"HEARTBEAT"}""" val FirstRecordWithPrefix = s"121\n$FirstRecordData" val SecondRecordWithPrefix = s"20\n$SecondRecordData" val basicSource = Source.single(ByteString(FirstRecordWithPrefix + SecondRecordWithPrefix))
Running it through the RecordIO framing flow:
- Scala
-
val result = basicSource via RecordIOFraming.scanner() runWith Sink.seq
We obtain:
- Scala
-
result.futureValue shouldBe Seq(ByteString(FirstRecordData), ByteString(SecondRecordData))
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > simpleCodecs/testOnly *.RecordIOFramingSpec
The source code for this page can be found here.