File Connectors
The File connectors provide additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in akka.stream.javadsl.FileIOakka.stream.scaladsl.FileIO)).
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-file" % "0.15"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-file_2.12</artifactId> <version>0.15</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '0.15' }
Tailing a file into a stream
The FileTailSource
starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).
A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource
contains a few factory methods to create a source that parses the bytes into lines and emits those.
In this sample we simply tail the lines of a file and print them to standard out:
- Scala
-
val fs = FileSystems.getDefault val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines( path = fs.getPath(path), maxLineSize = 8192, pollingInterval = 250.millis ) lines.runForeach(line => System.out.println(line))
- Java
-
final FileSystem fs = FileSystems.getDefault(); final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS); final int maxLineSize = 8192; final Source<String, NotUsed> lines = akka.stream.alpakka.file.javadsl.FileTailSource.createLines(fs.getPath(path), maxLineSize, pollingInterval); lines.runForeach((line) -> System.out.println(line), materializer);
Source on Github Source on Github
Listing directory contents
Directory.ls(path)
lists all files and directories directly in a given directory:
- Scala
-
val source: Source[Path, NotUsed] = Directory.ls(dir)
- Java
-
final Source<Path, NotUsed> source = Directory.ls(dir);
Directory.walk(path)
traverses all subdirectories and lists files and directories depth first:
- Scala
-
val files: Source[Path, NotUsed] = Directory.walk(root)
- Java
-
final Source<Path, NotUsed> source = Directory.walk(root); final Source<Path, NotUsed> source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS);
Source on Github Source on Github
Listening to changes in a directory
The DirectoryChangesSource
will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.
In this sample we simply print each change to the directory to standard output:
- Scala
-
val fs = FileSystems.getDefault val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000) changes.runForeach { case (path, change) => println("Path: " + path + ", Change: " + change) }
- Java
-
final FileSystem fs = FileSystems.getDefault(); final FiniteDuration pollingInterval = FiniteDuration.create(1, TimeUnit.SECONDS); final int maxBufferSize = 1000; final Source<Pair<Path, DirectoryChange>, NotUsed> changes = DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize); changes.runForeach((Pair<Path, DirectoryChange> pair) -> { final Path changedPath = pair.first(); final DirectoryChange change = pair.second(); System.out.println("Path: " + changedPath + ", Change: " + change); }, materializer);
Source on Github Source on Github
Rotating the file to stream into
The LogRotatatorSink LogRotatatorSink will create and write to multiple files.
This sink will takes a function as parameter which returns a Bytestring => Option[Path]
functionFunction<ByteString, Optional<Path>>
. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString
will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.
A small snippet for the usage
- Scala
-
val pathGeneratorFunction: () => ByteString => Option[Path] = ??? val completion = Source(Seq("test1", "test2", "test3", "test4", "test5", "test6").toList) .map(ByteString(_)) .runWith(LogRotatorSink(pathGeneratorFunction))
- Java
-
import akka.stream.alpakka.file.javadsl.LogRotatorSink; Creator<Function<ByteString, Optional<Path>>> pathGeneratorCreator = ...; CompletionStage<Done> completion = Source.from(Arrays.asList("test1", "test2", "test3", "test4", "test5", "test6")) .map(ByteString::fromString) .runWith(LogRotatorSink.createFromFunction(pathGeneratorCreator), materializer);
Source on Github Source on Github
Example: size-based rotation
- Scala
-
val fileSizeRotationFunction = () => { val max = 10 * 1024 * 1024 var size: Long = max (element: ByteString) => { if (size + element.size > max) { val path = Files.createTempFile("out-", ".log") size = element.size Some(path) } else { size += element.size None } } } val sizeRotatorSink: Sink[ByteString, Future[Done]] = LogRotatorSink(fileSizeRotationFunction)
- Java
-
Creator<Function<ByteString, Optional<Path>>> sizeBasedPathGenerator = () -> { long max = 10 * 1024 * 1024; final long[] size = new long[]{max}; return (element) -> { if (size[0] + element.size() > max) { Path path = Files.createTempFile("out-", ".log"); size[0] = element.size(); return Optional.of(path); } else { size[0] += element.size(); return Optional.empty(); } }; }; Sink<ByteString, CompletionStage<Done>> sizeRotatorSink = LogRotatorSink.createFromFunction(sizeBasedPathGenerator);
Source on Github Source on Github
Example: time-based rotation
- Scala
-
val destinationDir = FileSystems.getDefault.getPath("/tmp") val formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'") val timeBasedRotationFunction = () => { var currentFilename: Option[String] = None (_: ByteString) => { val newName = LocalDateTime.now().format(formatter) if (currentFilename.contains(newName)) { None } else { currentFilename = Some(newName) Some(destinationDir.resolve(newName)) } } } val timeBasedSink: Sink[ByteString, Future[Done]] = LogRotatorSink(timeBasedRotationFunction)
- Java
-
final Path destinationDir = FileSystems.getDefault().getPath("/tmp"); final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'"); Creator<Function<ByteString, Optional<Path>>> timeBasedPathCreator = () -> { final String[] currentFileName = new String[]{null}; return (element) -> { String newName = LocalDateTime.now().format(formatter); if (newName.equals(currentFileName[0])) { return Optional.empty(); } else { currentFileName[0] = newName; return Optional.of(destinationDir.resolve(newName)); } }; }; Sink<ByteString, CompletionStage<Done>> timeBaseSink = LogRotatorSink.createFromFunction(timeBasedPathCreator);
Source on Github Source on Github
Running the example code
Both the samples are contained in standalone runnable mains, they can be run from sbt
like this:
- Scala
-
sbt // tail source > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.FileTailSourceSpec /some/path/toa/file // or directory changes > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.DirectoryChangesSourceSpec /some/directory/path // File rotator > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.LogRotatorSinkTest
- Java
-
sbt // tail source > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.FileTailSourceTest /some/path/toa/file // or directory changes > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.DirectoryChangesSourceTest /some/directory/path // File rotator > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.LogRotatorSinkTest