Apache Geode connector
Apache Geode is a distributed datagrid (ex Gemfire).
This connector provides flow and a sink to put element in and source to retrieve element from geode.
Basically it can store data as key, value. Key and value must be serialized, more on this later.
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-geode" % "0.15"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-geode_2.12</artifactId> <version>0.15</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-geode_2.12', version: '0.15' }
Usage
Connection
First of all you need to connect to the geode cache. In a client application, connection is handle by a ClientCache. A single ClientCache per application is enough. ClientCache also holds a single PDXSerializer.
- scala
-
val reactiveGeode = new ReactiveGeode(geodeSettings)
- java
-
GeodeSettings settings = GeodeSettings.create(geodeDockerHostname, 10334) .withConfiguration(func(c->c.setPoolIdleTimeout(10))); return new ReactiveGeode(settings);
Apache Geode supports continuous queries. Continuous query relies on server event, thus reactive geode needs to listen to those event. This behaviour, as it consumes more resources is isolated in a scala trait and/or an specialized java class.
- scala
-
val reactiveGeode = new ReactiveGeode(geodeSettings) with PoolSubscription
- java
-
return new ReactiveGeodeWithPoolSubscription(settings);
Region
Define a region setting to describe how to access region and the key extraction function.
- scala
-
val personsRegionSettings = RegionSettings("persons", (p: Person) => p.id) val animalsRegionSettings = RegionSettings("animals", (a: Animal) => a.id) val complexesRegionSettings = RegionSettings("complexes", (a: Complex) => a.id)
- java
-
protected RegionSettings<Integer, Person> personRegionSettings = new RegionSettings<>("persons", func(Person::getId)); protected RegionSettings<Integer, Animal> animalRegionSettings = new RegionSettings<>("animals", func(Animal::getId));
Serialization
Object must be serialized to flow in a geode region.
- opaque format (eq json/xml)
- java serialisation
- pdx geode format
PDX format is the only one supported.
PDXEncoder support many options, see gemfire_pdx_serialization.html
PdxSerializer must be provided to geode when reading or writing to a region.
- scala
-
object PersonPdxSerializer extends AkkaPdxSerializer[Person] { override def clazz: Class[Person] = classOf[Person] override def toData(o: scala.Any, out: PdxWriter): Boolean = if (o.isInstanceOf[Person]) { val p = o.asInstanceOf[Person] out.writeInt("id", p.id) out.writeString("name", p.name) out.writeDate("birthDate", p.birthDate) true } else false override def fromData(clazz: Class[_], in: PdxReader): AnyRef = { val id: Int = in.readInt("id") val name: String = in.readString("name") val birthDate: Date = in.readDate("birthDate") Person(id, name, birthDate) } }
- java
-
public class PersonPdxSerializer implements AkkaPdxSerializer<Person> { @Override public Class<Person> clazz() { return Person.class; } @Override public boolean toData(Object o, PdxWriter out) { if(o instanceof Person){ Person p = (Person)o; out.writeInt("id", p.getId()); out.writeString("name", p.getName()); out.writeDate("birthDate", p.getBirthDate()); return true; } return false; } @Override public Object fromData(Class<?> clazz, PdxReader in) { int id = in.readInt("id"); String name = in.readString("name"); Date birthDate = in.readDate("birthDate"); return new Person(id, name, birthDate); } }
This project provides a generic solution for scala user based on shapeless, then case classe serializer if not provided will be generated compile time. Java user will need to write by hand their custom serializer.
Runtime reflection is also an option see auto_serialization.html.
Flow usage
This sample stores (case) classes in Geode.
- scala
-
val flow: Flow[Person, Person, NotUsed] = reactiveGeode.flow(personsRegionSettings) val fut = source.via(flow).runWith(Sink.ignore)
- java
-
Flow<Person, Person, NotUsed> flow = reactiveGeode.flow(personRegionSettings, new PersonPdxSerializer()); CompletionStage<List<Person>> run = source .via(flow) .toMat(Sink.seq(), Keep.right()) .run(materializer);
Sink usage
- scala
-
val sink = reactiveGeode.sink(animalsRegionSettings) val fut = source.runWith(sink) val sink = reactiveGeode.sink(complexesRegionSettings) val fut = source.runWith(sink)
- java
-
Sink<Animal, CompletionStage<Done>> sink = reactiveGeode.sink(animalRegionSettings, new AnimalPdxSerializer()); RunnableGraph<CompletionStage<Done>> runnableGraph = source .toMat(sink, Keep.right());
Source usage
Simple query
Apache Geode support simple queries.
- scala
-
val source = reactiveGeode .query[Person](s"select * from /persons order by id") .runWith(Sink.foreach(e => log.debug(s"$e")))
- java
-
CompletionStage<Done> personsDone = reactiveGeode.query("select * from /persons", new PersonPdxSerializer()) .runForeach(p -> { LOGGER.debug(p.toString()); }, materializer);
Continuous query
- scala
-
val source = reactiveGeode .continuousQuery[Person]('test, s"select * from /persons") .runWith(Sink.fold(0) { (c, p) => log.debug(s"$p $c") if (c == 19) { reactiveGeode.closeContinuousQuery('test).foreach { _ => log.debug("test cQuery is closed") } } c + 1 })
- java
-
CompletionStage<Done> fut = reactiveGeode.continuousQuery("test", "select * from /persons", new PersonPdxSerializer()) .runForeach(p -> { LOGGER.debug(p.toString()); if (p.getId() == 120) { reactiveGeode.closeContinuousQuery("test"); } }, materializer);
Geode basic command:
Assuming Apache geode is installed:
gfsh
From the geode shell:
start locator --name=locator
configure pdx --read-serialized=true
start server --name=server
create region --name=animals --type=PARTITION_REDUNDANT --redundant-copies=2
create region --name=persons --type=PARTITION_REDUNDANT --redundant-copies=2
Run the test
Integration test are run against localhost geode, but IT_GEODE_HOSTNAME environment variable can change this:
export IT_GEODE_HOSTNAME=geode-host-locator
sbt
From sbt shell
project geode
test