Google Cloud Pub/Sub
The google cloud pub/sub connector provides a way to connect to google clouds managed pub/sub https://cloud.google.com/pubsub/.
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "0.15"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-google-cloud-pub-sub_2.12</artifactId> <version>0.15</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-google-cloud-pub-sub_2.12', version: '0.15' }
Usage
Prepare your credentials for access to google cloud pub/sub.
- Scala
-
val privateKey: PrivateKey = { val pk = "MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCxwdLoCIviW0BsREeKzi" + "qiSgzl17Q6nD4RhqbB71oPGG8h82EJPeIlLQsMGEtuig0MVsUa9MudewFuQ/XHWtxnueQ3I900EJm" + "rDTA4ysgHcVvyDBPuYdVVV7LE/9nysuHb2x3bh057Sy60qZqDS2hV9ybOBp2RIEK04k/hQDDqp+Lx" + "cnNQBi5C0f6aohTN6Ced2vvTY6hWbgFDk4Hdw9JDJpf8TSx/ZxJxPd3EA58SgXRBuamVZWy1IVpFO" + "SKUCr4wwMOrELu9mRGzmNJiLSqn1jqJlG97ogth3dEldSOtwlfVI1M4sDe3k1SnF1+IagfK7Wda5h" + "PbMdbh2my3EMGY159ktbtTAUzJejPQfhVzk84XNxVPdjN01xN2iceXSKcJHzy8iy9JHb+t9qIIcYk" + "ZPJrBCyphUGlMWE+MFwtjbHMBxhqJNyG0TYByWudF+/QRFaz0FsMr4TmksNmoLPBZTo8zAoGBAKZI" + "vf5XBlTqd/tR4cnTBQOeeegTHT5x7e+W0mfpCo/gDDmKnOsF2lAwj/F/hM5WqorHoM0ibno+0zUb5" + "q6rhccAm511h0LmV1taVkbWk4UReuPuN+UyVUP+IjmXjagDle9IkOE7+fDlNb+Q7BHl2R8zm1jZjE" + "DwM2NQnSxQ22+/" val kf = KeyFactory.getInstance("RSA") val encodedPv = Base64.getDecoder.decode(pk) val keySpecPv = new PKCS8EncodedKeySpec(encodedPv) kf.generatePrivate(keySpecPv) } val clientEmail = "[email protected]" val projectId = "test-XXXXX" val apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE" val topic = "topic1" val subscription = "subscription1"
- Java
-
String keyString = "MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCxwdLoCIviW0BsREeKzi" + "qiSgzl17Q6nD4RhqbB71oPGG8h82EJPeIlLQsMGEtuig0MVsUa9MudewFuQ/XHWtxnueQ3I900EJm" + "rDTA4ysgHcVvyDBPuYdVVV7LE/9nysuHb2x3bh057Sy60qZqDS2hV9ybOBp2RIEK04k/hQDDqp+Lx" + "cnNQBi5C0f6aohTN6Ced2vvTY6hWbgFDk4Hdw9JDJpf8TSx/ZxJxPd3EA58SgXRBuamVZWy1IVpFO" + "SKUCr4wwMOrELu9mRGzmNJiLSqn1jqJlG97ogth3dEldSOtwlfVI1M4sDe3k1SnF1+IagfK7Wda5h" + "PbMdbh2my3EMGY159ktbtTAUzJejPQfhVzk84XNxVPdjN01xN2iceXSKcJHzy8iy9JHb+t9qIIcYk" + "ZPJrBCyphUGlMWE+MFwtjbHMBxhqJNyG0TYByWudF+/QRFaz0FsMr4TmksNmoLPBZTo8zAoGBAKZI" + "vf5XBlTqd/tR4cnTBQOeeegTHT5x7e+W0mfpCo/gDDmKnOsF2lAwj/F/hM5WqorHoM0ibno+0zUb5" + "q6rhccAm511h0LmV1taVkbWk4UReuPuN+UyVUP+IjmXjagDle9IkOE7+fDlNb+Q7BHl2R8zm1jZjE" + "DwM2NQnSxQ22+/"; KeyFactory kf = KeyFactory.getInstance("RSA"); byte[] encodedPv = Base64.getDecoder().decode(keyString); PKCS8EncodedKeySpec keySpecPv = new PKCS8EncodedKeySpec(encodedPv); PrivateKey privateKey = kf.generatePrivate(keySpecPv); String clientEmail = "[email protected]"; String projectId = "test-XXXXX"; String apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE"; String topic = "topic1"; String subscription = "subscription1";
And prepare the actor system and materializer.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system);
To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.
- Scala
-
val publishMessage = PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val publishRequest = PublishRequest(Seq(publishMessage)) val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest) val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = GooglePubSub.publish(projectId, apiKey, clientEmail, privateKey, topic) val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
- Java
-
PubSubMessage publishMessage = new PubSubMessage("1", new String(Base64.getEncoder().encode("Hello Google!".getBytes()))); PublishRequest publishRequest = PublishRequest.of(Lists.newArrayList(publishMessage)); Source<PublishRequest, NotUsed> source = Source.single(publishRequest); Flow<PublishRequest, List<String>, NotUsed> publishFlow = GooglePubSub.publish(projectId, apiKey, clientEmail, privateKey, topic, 1, system, materializer); CompletionStage<List<List<String>>> publishedMessageIds = source.via(publishFlow).runWith(Sink.seq(), materializer);
To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.
- Scala
-
val messageSource: Source[PubSubMessage, NotUsed] = Source(List(publishMessage, publishMessage)) messageSource.groupedWithin(1000, 1.minute).map(PublishRequest.apply).via(publishFlow).to(Sink.seq)
- Java
-
Source<PubSubMessage, NotUsed> messageSource = Source.single(publishMessage); messageSource.groupedWithin(1000, FiniteDuration.apply(1, "min")) .map(messages -> PublishRequest.of(messages)) .via(publishFlow) .runWith(Sink.ignore(), materializer);
To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest
- Scala
-
val subscriptionSource: Source[ReceivedMessage, NotUsed] = GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription) val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription) subscriptionSource .map { message => // do something fun message.ackId } .groupedWithin(1000, 1.minute) .map(AcknowledgeRequest.apply) .to(ackSink)
- Java
-
Source<ReceivedMessage, NotUsed> subscriptionSource = GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription, system, materializer); Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink = GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription, 1, system, materializer); subscriptionSource.map( message -> { // do something fun return message.ackId(); }).groupedWithin(1000, FiniteDuration.apply(1, "min")).map(acks -> AcknowledgeRequest.of(acks)).to(ackSink);
If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.
- Scala
-
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = ??? val processMessage: Sink[ReceivedMessage, NotUsed] = ??? val batchAckSink = Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink) val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
- Java
-
Sink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink; Sink<ReceivedMessage, NotUsed> batchAckSink = Flow.of(ReceivedMessage.class) .map(t -> t.ackId()) .groupedWithin(1000, FiniteDuration.apply(1, "minute")) .map(ids -> AcknowledgeRequest.of(ids)) .to(ackSink); subscriptionSource.alsoTo(batchAckSink).to(processSink);
Running the examples
To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.