Integration Patterns
Many Enterprise Integration Patterns can be implemented with Akka Streams (see Java documentation or Scala documentation).
Splitter
You can achieve a Splitter as described in EIP using out of the box Akka Streams dsl.
Simple Splitter
Let’s say that we have a stream containing strings. Each string contains a few numbers separated by “-”. We want to create out of this a stream that only contains the numbers.
- Scala
-
//Sample Source val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4")) val ret = source .map(s => s.split("-").toList) .mapConcat(identity) //Sub-streams logic .map(s => s.toInt) .runWith(Sink.seq) //Verify results ret.futureValue should be(Vector(1, 2, 3, 2, 3, 3, 4))
- Java
-
//Sample Source Source<String, NotUsed> source = Source.from(Arrays.asList(new String[]{"1-2-3", "2-3", "3-4"})); CompletionStage<List<Integer>> ret = source.map(s -> Arrays.asList(s.split("-"))) .mapConcat(f -> f) //Sub-streams logic .map(s -> Integer.valueOf(s)) .runWith(Sink.seq(), materializer); //Verify results List<Integer> list = ret.toCompletableFuture().get(); assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4));
Spliter + Aggregator
Sometimes it’s very useful to split a message and aggregate it’s “sub-messages” into a new message (A combination of Splitter and Aggregator)
Let’s say that now we want to create a new stream containing the sums of the numbers in each original string.
- Scala
-
//Sample Source val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4")) val result = source .map(s => s.split("-").toList) //split all messages into sub-streams .splitWhen(a => true) //now split each collection .mapConcat(identity) //Sub-streams logic .map(s => s.toInt) //aggregate each sub-stream .reduce((a, b) => a + b) //and merge back the result into the original stream .mergeSubstreams .runWith(Sink.seq); //Verify results result.futureValue should be(Vector(6, 5, 7))
- Java
-
//Sample Source Source<String, NotUsed> source = Source.from(Arrays.asList(new String[]{"1-2-3", "2-3", "3-4"})); CompletionStage<List<Integer>> ret = source.map(s -> Arrays.asList(s.split("-"))) //split all messages into sub-streams .splitWhen(a -> true) //now split each collection .mapConcat(f -> f) //Sub-streams logic .map(s -> Integer.valueOf(s)) //aggregate each sub-stream .reduce((a, b) -> a + b) //and merge back the result into the original stream .mergeSubstreams() .runWith(Sink.seq(), materializer); //Verify results List<Integer> list = ret.toCompletableFuture().get(); assert list.equals(Arrays.asList(6, 5, 7));
While in real life this solution if overkill for such a simple problem (you can just do everything in a map), more complex scenarios, involving in particular I/O, will benefit from the fact that you can paralelize sub-streams and get back-pressure for “free”.
TODO: Create documentation pages for typical integration patterns and some might deserve a higher level component that is implemented in Alpakka. Contributions are very welcome. Creating an issue for discussion is a good first step for such contributions.