File Connectors
The File connectors provides additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in akka.stream.javadsl.FileIO
and akka.stream.scaladsl.FileIO
).
Artifacts
- sbt
-
libraryDependencies += "com.typesafe.akka" %% "akka-stream-alpakka-file" % "0.1-RC1"
- Maven
-
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-alpakka-file_2.12</artifactId> <version>0.1-RC1</version> </dependency>
- Gradle
-
dependencies { compile group: "com.typesafe.akka", name: "akka-stream-alpakka-file_2.12", version: "0.1-RC1" }
Usage
FileTailSource
The FileTailSource
starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).
A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource
contains a few factory methods to create a source that parses the bytes into lines and emits those.
In this sample we simply tail the lines of a file and print them to standard out:
- Scala
-
val fs = FileSystems.getDefault val lines: Source[String, NotUsed] = scaladsl.FileTailSource.lines( path = fs.getPath(path), maxLineSize = 8192, pollingInterval = 250.millis ) lines.runForeach(line => System.out.println(line))
- Java
-
final FileSystem fs = FileSystems.getDefault(); final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS); final int maxLineSize = 8192; final Source<String, NotUsed> lines = akka.stream.alpakka.file.javadsl.FileTailSource.createLines(fs.getPath(path), maxLineSize, pollingInterval); lines.runForeach((line) -> System.out.println(line), materializer);
DirectoryChangesSource
The DirectoryChangesSource
will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.
In this sample we simply print each change to the directory to standard output:
- Scala
-
val fs = FileSystems.getDefault val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000) changes.runForeach { case (path, change) => println("Path: " + path + ", Change: " + change) }
- Java
-
final FileSystem fs = FileSystems.getDefault(); final FiniteDuration pollingInterval = FiniteDuration.create(1, TimeUnit.SECONDS); final int maxBufferSize = 1000; final Source<Pair<Path, DirectoryChange>, NotUsed> changes = DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize); changes.runForeach((Pair<Path, DirectoryChange> pair) -> { final Path changedPath = pair.first(); final DirectoryChange change = pair.second(); System.out.println("Path: " + changedPath + ", Change: " + change); }, materializer);
Running the example code
Both the samples are contained in standalone runnable mains, they can be run from sbt
like this:
- Scala
-
sbt // tail source > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.FileTailSourceSpec /some/path/toa/file // or directory changes > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.scaladsl.DirectoryChangesSourceSpec /some/directory/path
- Java
-
sbt // tail source > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.FileTailSourceTest /some/path/toa/file // or directory changes > akka-stream-alpakka-file/test:runMain akka.stream.alpakka.file.javadsl.DirectoryChangesSourceTest /some/directory/path