Cluster
For an introduction to Akka Cluster concepts see Cluster Specification. This documentation shows how to use the typed Cluster API.
This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.
Dependency
To use Akka Cluster Typed, add the module to your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.5.11"
- Maven
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_2.12</artifactId> <version>2.5.11</version> </dependency>
- Gradle
dependencies { compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.5.11' }
Examples
All of the examples below assume the following imports:
- Scala
-
import akka.actor.typed._ import akka.actor.typed.scaladsl._ import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus import akka.cluster.typed._
- Java
-
import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*;
And the minimum configuration required is to set a host/port for remoting and the akka.actor.provider = "cluster"
.
- Scala
-
import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*; - Java
-
import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*;
Cluster API extension
The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor references, i.e. it’s a message based API.
The references are on the Cluster
extension:
- Scala
-
val cluster1 = Cluster(system)
- Java
-
Cluster cluster = Cluster.get(system);
The Cluster extensions gives you access to:
- manager: An
ActorRef[ClusterCommand]
where aClusterCommand
is a command such as:Join
,Leave
andDown
- subscriptions: An
ActorRef[ClusterStateSubscription]
where aClusterStateSubscription
is one ofGetCurrentState
orSubscribe
andUnsubscribe
to cluster events likeMemberRemoved
- state: The current
CurrentClusterState
Cluster Management
If not using configuration to specify seeds joining the cluster can be done programmatically via the manager
.
- Scala
-
cluster1.manager ! Join(cluster1.selfMember.address)
- Java
-
cluster.manager().tell(Join.create(cluster.selfMember().address()));
Leaving and downing are similar e.g.
- Scala
-
cluster2.manager ! Leave(cluster2.selfMember.address)
- Java
-
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
Cluster subscriptions
Cluster subscriptions
can be used to receive messages when cluster state changes. For example, registering for all MemberEvent
s, then using the manager
to have a node leave the cluster will result in events for the node going through the lifecycle described in Cluster Specification.
This example subscribes with a TestProbe
but in a real application it would be an Actor:
- Scala
-
val probe1 = TestProbe[MemberEvent]()(system1) cluster1.subscriptions ! Subscribe(probe1.ref, classOf[MemberEvent])
- Java
-
TestProbe<ClusterEvent.MemberEvent> testProbe = TestProbe.create(system); cluster.subscriptions().tell(Subscribe.create(testProbe.ref(), ClusterEvent.MemberEvent.class));
Then asking a node to leave:
- Scala
-
cluster1.manager ! Leave(cluster2.selfMember.address) probe1.within(10.seconds) { probe1.expectMessageType[MemberLeft].member.address shouldEqual cluster2.selfMember.address probe1.expectMessageType[MemberExited].member.address shouldEqual cluster2.selfMember.address probe1.expectMessageType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address }
- Java
-
cluster.manager().tell(Leave.create(cluster2.selfMember().address())); testProbe.expectMessageClass(ClusterEvent.MemberLeft.class); testProbe.expectMessageClass(ClusterEvent.MemberExited.class); testProbe.expectMessageClass(ClusterEvent.MemberRemoved.class);
Serialization
See serialization for how messages are sent between ActorSystems. Actor references are typically included in the messages, since there is no sender
. To serialize actor references to/from string representation you will use the ActorRefResolver
. For example here’s how a serializer could look for the Ping
and Pong
messages above:
- Scala
-
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { private val actorRefResolver = ActorRefResolver(system.toTyped) private val PingManifest = "a" private val PongManifest = "b" override def identifier = 41 override def manifest(msg: AnyRef) = msg match { case _: Ping ⇒ PingManifest case Pong ⇒ PongManifest } override def toBinary(msg: AnyRef) = msg match { case Ping(who) ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) case Pong ⇒ Array.emptyByteArray } override def fromBinary(bytes: Array[Byte], manifest: String) = { manifest match { case PingManifest ⇒ val str = new String(bytes, StandardCharsets.UTF_8) val ref = actorRefResolver.resolveActorRef[Pong.type](str) Ping(ref) case PongManifest ⇒ Pong } } }