public MessagingService
A interface MessagingService
sits at the boundary between a message routing / networking layer and the core platform code.
A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message eventually will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed.
Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer is reliable and as such messages may be stored to disk once queued.
interface MessagingService
Modifier and Type | Method and Description |
---|---|
MessageHandlerRegistration |
addMessageHandler(java.lang.String topic,
long sessionID,
kotlin.jvm.functions.Function2<? super net.corda.node.services.messaging.ReceivedMessage,? super net.corda.node.services.messaging.MessageHandlerRegistration,kotlin.Unit> callback)
The provided function will be invoked for each received message whose topic matches the given string. The callback
will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
|
MessageHandlerRegistration |
addMessageHandler(TopicSession topicSession,
kotlin.jvm.functions.Function2<? super net.corda.node.services.messaging.ReceivedMessage,? super net.corda.node.services.messaging.MessageHandlerRegistration,kotlin.Unit> callback)
The provided function will be invoked for each received message whose topic and session matches. The callback
will run on the main server thread provided when the messaging service is constructed, and a database
transaction is set up for you automatically.
|
void |
cancelRedelivery(long retryId)
Cancels the scheduled message redelivery for the specified retryId
|
Message |
createMessage(TopicSession topicSession,
byte[] data,
java.util.UUID uuid)
Returns an initialised
interface Message with the current time, etc, already filled in. |
MessageRecipients |
getAddressOfParty(PartyInfo partyInfo)
Given information about either a specific node or a service returns its corresponding address
|
SingleMessageRecipient |
getMyAddress()
Returns an address that refers to this node.
|
void |
removeMessageHandler(MessageHandlerRegistration registration)
Removes a handler given the object returned from addMessageHandler. The callback will no longer be invoked once
this method has returned, although executions that are currently in flight will not be interrupted.
|
void |
send(Message message,
MessageRecipients target,
java.lang.Long retryId)
Sends a message to the given receiver. The details of how receivers are identified is up to the messaging
implementation: the type system provides an opaque high level view, with more fine grained control being
available via type casting. Once this function returns the message is queued for delivery but not necessarily
delivered: if the recipients are offline then the message could be queued hours or days later.
|
void |
stop()
Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
then this will block until all in-flight messages have finished being handled and acknowledged. If called
from a thread that's a part of the
interface AffinityExecutor given to the constructor,
it returns immediately and shutdown is asynchronous. |
MessageHandlerRegistration addMessageHandler(java.lang.String topic, long sessionID, kotlin.jvm.functions.Function2<? super net.corda.node.services.messaging.ReceivedMessage,? super net.corda.node.services.messaging.MessageHandlerRegistration,kotlin.Unit> callback)
The provided function will be invoked for each received message whose topic matches the given string. The callback will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
The returned object is an opaque handle that may be used to un-register handlers later with MessagingService.removeMessageHandler
.
The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
itself and yet addMessageHandler hasn't returned the handle yet.
topic
- identifier for the general subject of the message, for example "platform.network_map.fetch".
The topic can be the empty string to match all messages (session ID must be DEFAULT_SESSION_ID).sessionID
- identifier for the session the message is part of. For services listening before
a session is established, use DEFAULT_SESSION_ID.MessagingService.removeMessageHandler
MessageHandlerRegistration addMessageHandler(TopicSession topicSession, kotlin.jvm.functions.Function2<? super net.corda.node.services.messaging.ReceivedMessage,? super net.corda.node.services.messaging.MessageHandlerRegistration,kotlin.Unit> callback)
The provided function will be invoked for each received message whose topic and session matches. The callback will run on the main server thread provided when the messaging service is constructed, and a database transaction is set up for you automatically.
The returned object is an opaque handle that may be used to un-register handlers later with MessagingService.removeMessageHandler
.
The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
itself and yet addMessageHandler hasn't returned the handle yet.
topicSession
- identifier for the topic and session to listen for messages arriving on.MessagingService.removeMessageHandler
void removeMessageHandler(MessageHandlerRegistration registration)
Removes a handler given the object returned from addMessageHandler. The callback will no longer be invoked once this method has returned, although executions that are currently in flight will not be interrupted.
void send(Message message, MessageRecipients target, java.lang.Long retryId)
Sends a message to the given receiver. The details of how receivers are identified is up to the messaging implementation: the type system provides an opaque high level view, with more fine grained control being available via type casting. Once this function returns the message is queued for delivery but not necessarily delivered: if the recipients are offline then the message could be queued hours or days later.
There is no way to know if a message has been received. If your flow requires this, you need the recipient to send an ACK message back.
retryId
- if provided the message will be scheduled for redelivery until MessagingService.cancelRedelivery
is called for this id.
Note that this feature should only be used when the target is an idempotent distributed service, e.g. a notary.void cancelRedelivery(long retryId)
Cancels the scheduled message redelivery for the specified retryId
Message createMessage(TopicSession topicSession, byte[] data, java.util.UUID uuid)
Returns an initialised interface Message
with the current time, etc, already filled in.
topicSession
- identifier for the topic and session the message is sent to.interface Message
MessageRecipients getAddressOfParty(PartyInfo partyInfo)
Given information about either a specific node or a service returns its corresponding address
SingleMessageRecipient getMyAddress()
Returns an address that refers to this node.
void stop()
Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
then this will block until all in-flight messages have finished being handled and acknowledged. If called
from a thread that's a part of the interface AffinityExecutor
given to the constructor,
it returns immediately and shutdown is asynchronous.
interface AffinityExecutor