Cluster Sharding
For an introduction to Sharding concepts see Cluster Sharding. This documentation shows how to use the typed Cluster Sharding API.
This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.
Dependency
To use Akka Cluster Sharding Typed, add the module to your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.5.11"
- Maven
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-sharding-typed_2.12</artifactId> <version>2.5.11</version> </dependency>
- Gradle
dependencies { compile group: 'com.typesafe.akka', name: 'akka-cluster-sharding-typed_2.12', version: '2.5.11' }
Basic example
Sharding is accessed via the ClusterSharding
extension
- Scala
-
import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EntityRef val sharding = ClusterSharding(system)
- Java
-
import akka.cluster.sharding.typed.ClusterShardingSettings; import akka.cluster.sharding.typed.ShardingEnvelope; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EntityRef; ClusterSharding sharding = ClusterSharding.get(system);
It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:
- Scala
-
trait CounterCommand case object Increment extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand case object GoodByeCounter extends CounterCommand def counter(entityId: String, value: Int): Behavior[CounterCommand] = Behaviors.immutable[CounterCommand] { case (ctx, Increment) ⇒ counter(entityId, value + 1) case (ctx, GetValue(replyTo)) ⇒ replyTo ! value Behaviors.same }
- Java
-
interface CounterCommand {} public static class Increment implements CounterCommand { } public static class GoodByeCounter implements CounterCommand { } public static class GetValue implements CounterCommand { private final ActorRef<Integer> replyTo; public GetValue(ActorRef<Integer> replyTo) { this.replyTo = replyTo; } } public static Behavior<CounterCommand> counter(String entityId, Integer value) { return Behaviors.immutable(CounterCommand.class) .onMessage(Increment.class, (ctx, msg) -> { return counter(entityId,value + 1); }) .onMessage(GetValue.class, (ctx, msg) -> { msg.replyTo.tell(value); return Behaviors.same(); }) .build(); }
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
- Scala
-
val TypeKey = EntityTypeKey[CounterCommand]("Counter") // if a extractor is defined then the type would be ActorRef[BasicCommand] val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn( behavior = entityId ⇒ counter(entityId, 0), props = Props.empty, typeKey = TypeKey, settings = ClusterShardingSettings(system), maxNumberOfShards = 10, handOffStopMessage = GoodByeCounter)
- Java
-
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn( entityId -> counter(entityId,0), Props.empty(), typeKey, ClusterShardingSettings.create(system), 10, new GoodByeCounter());
Messages to a specific entity are then sent via an EntityRef. It is also possible to wrap methods in a ShardingEnvelop
or define extractor functions and send messages directly to the shard region.
- Scala
-
// With an EntityRef val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1") counterOne ! Increment // Entity id is specified via an `ShardingEnvelope` shardRegion ! ShardingEnvelope("counter-1", Increment)
- Java
-
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`"); counterOne.tell(new Increment()); shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
Persistence example
When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of an actor after it has moved. Currently Akka typed only has a Scala API for persistence, you can track the progress of the Java API here.
Taking the larger example from the persistence documentation and making it into a sharded entity is the same as for a non persistent behavior. The behavior:
- Scala
-
def behavior(entityId: String): Behavior[BlogCommand] = PersistentBehaviors.immutable[BlogCommand, BlogEvent, BlogState]( persistenceId = "Blog-" + entityId, initialState = BlogState.empty, commandHandler, eventHandler)
To create the entity:
- Scala
-
val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost") ClusterSharding(system).spawn[BlogCommand]( behavior = entityId ⇒ InDepthPersistentBehaviorSpec.behavior(entityId), props = Props.empty, typeKey = ShardingTypeName, settings = ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = PassivatePost)
Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored. See persistence for more details.