API: Flows¶
Note
Before reading this page, you should be familiar with the key concepts of Flows.
Contents
An example flow¶
Before we discuss the API offered by the flow, let’s consider what a standard flow may look like.
Imagine a flow for agreeing a basic ledger update between Alice and Bob. This flow will have two sides:
- An
Initiator
side, that will initiate the request to update the ledger - A
Responder
side, that will respond to the request to update the ledger
Initiator¶
In our flow, the Initiator flow class will be doing the majority of the work:
Part 1 - Build the transaction
- Choose a notary for the transaction
- Create a transaction builder
- Extract any input states from the vault and add them to the builder
- Create any output states and add them to the builder
- Add any commands, attachments and timestamps to the builder
Part 2 - Sign the transaction
- Sign the transaction builder
- Convert the builder to a signed transaction
Part 3 - Verify the transaction
- Verify the transaction by running its contracts
Part 4 - Gather the counterparty’s signature
- Send the transaction to the counterparty
- Wait to receive back the counterparty’s signature
- Add the counterparty’s signature to the transaction
- Verify the transaction’s signatures
Part 5 - Finalize the transaction
- Send the transaction to the notary
- Wait to receive back the notarised transaction
- Record the transaction locally
- Store any relevant states in the vault
- Send the transaction to the counterparty for recording
We can visualize the work performed by initiator as follows:
Responder¶
To respond to these actions, the responder takes the following steps:
Part 1 - Sign the transaction
- Receive the transaction from the counterparty
- Verify the transaction’s existing signatures
- Verify the transaction by running its contracts
- Generate a signature over the transaction
- Send the signature back to the counterparty
Part 2 - Record the transaction
- Receive the notarised transaction from the counterparty
- Record the transaction locally
- Store any relevant states in the vault
FlowLogic¶
In practice, a flow is implemented as one or more communicating FlowLogic
subclasses. The FlowLogic
subclass’s constructor can take any number of arguments of any type. The generic of FlowLogic
(e.g.
FlowLogic<SignedTransaction>
) indicates the flow’s return type.
class Initiator(val arg1: Boolean,
val arg2: Int,
val counterparty: Party): FlowLogic<SignedTransaction>() { }
class Responder(val otherParty: Party) : FlowLogic<Unit>() { }
public static class Initiator extends FlowLogic<SignedTransaction> {
private final boolean arg1;
private final int arg2;
private final Party counterparty;
public Initiator(boolean arg1, int arg2, Party counterparty) {
this.arg1 = arg1;
this.arg2 = arg2;
this.counterparty = counterparty;
}
}
public static class Responder extends FlowLogic<Void> { }
FlowLogic annotations¶
Any flow that you wish to start either directly via RPC or as a subflow must be annotated with the
@InitiatingFlow
annotation. Additionally, if you wish to start the flow via RPC, you must annotate it with the
@StartableByRPC
annotation:
@InitiatingFlow
@StartableByRPC
class Initiator(): FlowLogic<Unit>() { }
@InitiatingFlow
@StartableByRPC
public static class Initiator extends FlowLogic<Unit> { }
Meanwhile, any flow that responds to a message from another flow must be annotated with the @InitiatedBy
annotation.
@InitiatedBy
takes the class of the flow it is responding to as its single parameter:
@InitiatedBy(Initiator::class)
class Responder(val otherParty: Party) : FlowLogic<Unit>() { }
@InitiatedBy(Initiator.class)
public static class Responder extends FlowLogic<Void> { }
Additionally, any flow that is started by a SchedulableState
must be annotated with the @SchedulableFlow
annotation.
Call¶
Each FlowLogic
subclass must override FlowLogic.call()
, which describes the actions it will take as part of
the flow. For example, the actions of the initiator’s side of the flow would be defined in Initiator.call
, and the
actions of the responder’s side of the flow would be defined in Responder.call
.
In order for nodes to be able to run multiple flows concurrently, and to allow flows to survive node upgrades and
restarts, flows need to be checkpointable and serializable to disk. This is achieved by marking FlowLogic.call()
,
as well as any function invoked from within FlowLogic.call()
, with an @Suspendable
annotation.
class Initiator(val counterparty: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() { }
}
public static class InitiatorFlow extends FlowLogic<Void> {
private final Party counterparty;
public Initiator(Party counterparty) {
this.counterparty = counterparty;
}
@Suspendable
@Override
public Void call() throws FlowException { }
}
ServiceHub¶
Within FlowLogic.call
, the flow developer has access to the node’s ServiceHub
, which provides access to the
various services the node provides. We will use the ServiceHub
extensively in the examples that follow. You can
also see API: ServiceHub for information about the services the ServiceHub
offers.
Common flow tasks¶
There are a number of common tasks that you will need to perform within FlowLogic.call
in order to agree ledger
updates. This section details the API for common tasks.
Transaction building¶
The majority of the work performed during a flow will be to build, verify and sign a transaction. We cover this in API: Transactions.
Retrieving information about other nodes¶
We can retrieve information about other nodes on the network and the services they offer using
ServiceHub.networkMapCache
.
Notaries¶
Remember that a transaction generally needs a notary to:
- Prevent double-spends if the transaction has inputs
- Serve as a timestamping authority if the transaction has a time-window
There are several ways to retrieve a notary from the network map:
progressTracker.currentStep = ID_OTHER_NODES
progressTracker.setCurrentStep(ID_OTHER_NODES);
Specific counterparties¶
We can also use the network map to retrieve a specific counterparty:
val namedCounterparty: Party? = serviceHub.networkMapCache.getNodeByLegalName(X500Name("CN=NodeA,O=NodeA,L=London,C=UK"))?.legalIdentity
val keyedCounterparty: Party? = serviceHub.networkMapCache.getNodeByLegalIdentityKey(dummyPubKey)?.legalIdentity
val firstCounterparty: Party = serviceHub.networkMapCache.partyNodes[0].legalIdentity
Party namedCounterparty = getServiceHub().getNetworkMapCache().getNodeByLegalName(new X500Name("CN=NodeA,O=NodeA,L=London,C=UK")).getLegalIdentity();
Party keyedCounterparty = getServiceHub().getNetworkMapCache().getNodeByLegalIdentityKey(dummyPubKey).getLegalIdentity();
Party firstCounterparty = getServiceHub().getNetworkMapCache().getPartyNodes().get(0).getLegalIdentity();
Specific services¶
Finally, we can use the map to identify nodes providing a specific service (e.g. a regulator or an oracle):
val regulator: Party = serviceHub.networkMapCache.getNodesWithService(ServiceType.regulator)[0].legalIdentity
Party regulator = getServiceHub().getNetworkMapCache().getNodesWithService(ServiceType.Companion.getRegulator()).get(0).getLegalIdentity();
Communication between parties¶
FlowLogic
instances communicate using three functions:
send(otherParty: Party, payload: Any)
- Sends the
payload
object to theotherParty
- Sends the
receive(receiveType: Class<R>, otherParty: Party)
- Receives an object of type
receiveType
from theotherParty
- Receives an object of type
sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any)
- Sends the
payload
object to theotherParty
, and receives an object of typereceiveType
back
- Sends the
Send¶
We can send arbitrary data to a counterparty:
send(counterparty, Any())
send(counterparty, new Object());
If this is the first send
, the counterparty will either:
- Ignore the message if they are not registered to respond to messages from this flow.
- Start the flow they have registered to respond to this flow, and run the flow until the first call to
receive
, at which point they process the message. In other words, we are assuming that the counterparty is registered to respond to this flow, and has a correspondingreceive
call.
Receive¶
We can also wait to receive arbitrary data of a specific type from a counterparty. Again, this implies a corresponding
send
call in the counterparty’s flow. A few scenarios:
- We never receive a message back. In the current design, the flow is paused until the node’s owner kills the flow.
- Instead of sending a message back, the counterparty throws a
FlowException
. This exception is propagated back to us, and we can use the error message to establish what happened. - We receive a message back, but it’s of the wrong type. In this case, a
FlowException
is thrown. - We receive back a message of the correct type. All is good.
Upon calling receive
(or sendAndReceive
), the FlowLogic
is suspended until it receives a response.
We receive the data wrapped in an UntrustworthyData
instance. This is a reminder that the data we receive may not
be what it appears to be! We must unwrap the UntrustworthyData
using a lambda:
val packet1: UntrustworthyData<Int> = receive<Int>(counterparty)
val int: Int = packet1.unwrap { data ->
// Perform checking on the object received.
// T O D O: Check the received object.
// Return the object.
data
}
UntrustworthyData<Integer> packet1 = receive(Integer.class, counterparty);
Integer integer = packet1.unwrap(data -> {
// Perform checking on the object received.
// T O D O: Check the received object.
// Return the object.
return data;
});
We’re not limited to sending to and receiving from a single counterparty. A flow can send messages to as many parties as it likes, and each party can invoke a different response flow:
send(regulator, Any())
val packet3: UntrustworthyData<Any> = receive<Any>(regulator)
send(regulator, new Object());
UntrustworthyData<Object> packet3 = receive(Object.class, regulator);
SendAndReceive¶
We can also use a single call to send data to a counterparty and wait to receive data of a specific type back. The type of data sent doesn’t need to match the type of the data received back:
val packet2: UntrustworthyData<Boolean> = sendAndReceive<Boolean>(counterparty, "You can send and receive any class!")
val boolean: Boolean = packet2.unwrap { data ->
// Perform checking on the object received.
// T O D O: Check the received object.
// Return the object.
data
}
UntrustworthyData<Boolean> packet2 = sendAndReceive(Boolean.class, counterparty, "You can send and receive any class!");
Boolean bool = packet2.unwrap(data -> {
// Perform checking on the object received.
// T O D O: Check the received object.
// Return the object.
return data;
});
Counterparty response¶
Suppose we’re now on the Responder
side of the flow. We just received the following series of messages from the
Initiator
:
- They sent us an
Any
instance - They waited to receive an
Integer
instance back - They sent a
String
instance and waited to receive aBoolean
instance back
Our side of the flow must mirror these calls. We could do this as follows:
val any: Any = receive<Any>(counterparty).unwrap { data -> data }
val string: String = sendAndReceive<String>(counterparty, 99).unwrap { data -> data }
send(counterparty, true)
Object obj = receive(Object.class, counterparty).unwrap(data -> data);
String string = sendAndReceive(String.class, counterparty, 99).unwrap(data -> data);
send(counterparty, true);
Subflows¶
Corda provides a number of built-in flows that should be used for handling common tasks. The most important are:
CollectSignaturesFlow
, which should be used to collect a transaction’s required signaturesFinalityFlow
, which should be used to notarise and record a transactionResolveTransactionsFlow
, which should be used to verify the chain of inputs to a transactionContractUpgradeFlow
, which should be used to change a state’s contractNotaryChangeFlow
, which should be used to change a state’s notary
These flows are designed to be used as building blocks in your own flows. You invoke them by calling
FlowLogic.subFlow
from within your flow’s call
method. Let’s look at three very common examples.
FinalityFlow¶
FinalityFlow
allows us to notarise the transaction and get it recorded in the vault of the participants of all
the transaction’s states:
val notarisedTx1: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, FINALISATION.childProgressTracker())).single()
SignedTransaction notarisedTx1 = subFlow(new FinalityFlow(fullySignedTx, FINALISATION.childProgressTracker())).get(0);
We can also choose to send the transaction to additional parties who aren’t one of the state’s participants:
val additionalParties: Set<Party> = setOf(regulator)
val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(listOf(fullySignedTx), additionalParties, FINALISATION.childProgressTracker())).single()
Set<Party> additionalParties = ImmutableSet.of(regulator, regulator);
SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(ImmutableList.of(fullySignedTx), additionalParties, FINALISATION.childProgressTracker())).get(0);
Only one party has to call FinalityFlow
for a given transaction to be recorded by all participants. It does
not need to be called by each participant individually.
CollectSignaturesFlow/SignTransactionFlow¶
The list of parties who need to sign a transaction is dictated by the transaction’s commands. Once we’ve signed a
transaction ourselves, we can automatically gather the signatures of the other required signers using
CollectSignaturesFlow
:
val fullySignedTx: SignedTransaction = subFlow(CollectSignaturesFlow(twiceSignedTx, SIGS_GATHERING.childProgressTracker()))
SignedTransaction fullySignedTx = subFlow(new CollectSignaturesFlow(twiceSignedTx, SIGS_GATHERING.childProgressTracker()));
Each required signer will need to respond by invoking its own SignTransactionFlow
subclass to check the
transaction and provide their signature if they are satisfied:
val signTransactionFlow: SignTransactionFlow = object : SignTransactionFlow(counterparty) {
override fun checkTransaction(stx: SignedTransaction) = requireThat {
// Any additional checking we see fit...
val outputState = stx.tx.outputsOfType<DummyState>().single()
assert(outputState.magicNumber == 777)
}
}
subFlow(signTransactionFlow)
class SignTxFlow extends SignTransactionFlow {
private SignTxFlow(Party otherParty, ProgressTracker progressTracker) {
super(otherParty, progressTracker);
}
@Override
protected void checkTransaction(SignedTransaction stx) {
requireThat(require -> {
// Any additional checking we see fit...
DummyState outputState = (DummyState) stx.getTx().getOutputs().get(0).getData();
assert (outputState.getMagicNumber() == 777);
return null;
});
}
}
subFlow(new SignTxFlow(counterparty, SignTransactionFlow.Companion.tracker()));
ResolveTransactionsFlow¶
Verifying a transaction will also verify every transaction in the transaction’s dependency chain. So if we receive a transaction from a counterparty and it has any dependencies, we’d need to download all of these dependencies using``ResolveTransactionsFlow`` before verifying it:
subFlow(ResolveTransactionsFlow(twiceSignedTx, counterparty))
subFlow(new ResolveTransactionsFlow(twiceSignedTx, counterparty));
We can also resolve a StateRef dependency chain:
subFlow(ResolveTransactionsFlow(setOf(ourStateRef.txhash), counterparty))
subFlow(new ResolveTransactionsFlow(ImmutableSet.of(ourStateRef.getTxhash()), counterparty));
FlowException¶
Suppose a node throws an exception while running a flow. Any counterparty flows waiting for a message from the node
(i.e. as part of a call to receive
or sendAndReceive
) will be notified that the flow has unexpectedly
ended and will themselves end. However, the exception thrown will not be propagated back to the counterparties.
If you wish to notify any waiting counterparties of the cause of the exception, you can do so by throwing a
FlowException
:
/**
* Exception which can be thrown by a [FlowLogic] at any point in its logic to unexpectedly bring it to a permanent end.
* The exception will propagate to all counterparty flows and will be thrown on their end the next time they wait on a
* [FlowLogic.receive] or [FlowLogic.sendAndReceive]. Any flow which no longer needs to do a receive, or has already ended,
* will not receive the exception (if this is required then have them wait for a confirmation message).
*
* [FlowException] (or a subclass) can be a valid expected response from a flow, particularly ones which act as a service.
* It is recommended a [FlowLogic] document the [FlowException] types it can throw.
*/
open class FlowException(message: String?, cause: Throwable?) : CordaException(message, cause) {
constructor(message: String?) : this(message, null)
constructor(cause: Throwable?) : this(cause?.toString(), cause)
constructor() : this(null, null)
}
The flow framework will automatically propagate the FlowException
back to the waiting counterparties.
There are many scenarios in which throwing a FlowException
would be appropriate:
- A transaction doesn’t
verify()
- A transaction’s signatures are invalid
- The transaction does not match the parameters of the deal as discussed
- You are reneging on a deal
ProgressTracker¶
We can give our flow a progress tracker. This allows us to see the flow’s progress visually in our node’s CRaSH shell.
To provide a progress tracker, we have to override FlowLogic.progressTracker
in our flow:
companion object {
object ID_OTHER_NODES : Step("Identifying other nodes on the network.")
object SENDING_AND_RECEIVING_DATA : Step("Sending data between parties.")
object EXTRACTING_VAULT_STATES : Step("Extracting states from the vault.")
object OTHER_TX_COMPONENTS : Step("Gathering a transaction's other components.")
object TX_BUILDING : Step("Building a transaction.")
object TX_SIGNING : Step("Signing a transaction.")
object TX_VERIFICATION : Step("Verifying a transaction.")
object SIGS_GATHERING : Step("Gathering a transaction's signatures.") {
// Wiring up a child progress tracker allows us to see the
// subflow's progress steps in our flow's progress tracker.
override fun childProgressTracker() = CollectSignaturesFlow.tracker()
}
object VERIFYING_SIGS : Step("Verifying a transaction's signatures.")
object FINALISATION : Step("Finalising a transaction.") {
override fun childProgressTracker() = FinalityFlow.tracker()
}
fun tracker() = ProgressTracker(
ID_OTHER_NODES,
SENDING_AND_RECEIVING_DATA,
EXTRACTING_VAULT_STATES,
OTHER_TX_COMPONENTS,
TX_BUILDING,
TX_SIGNING,
TX_VERIFICATION,
SIGS_GATHERING,
VERIFYING_SIGS,
FINALISATION
)
}
private static final Step ID_OTHER_NODES = new Step("Identifying other nodes on the network.");
private static final Step SENDING_AND_RECEIVING_DATA = new Step("Sending data between parties.");
private static final Step EXTRACTING_VAULT_STATES = new Step("Extracting states from the vault.");
private static final Step OTHER_TX_COMPONENTS = new Step("Gathering a transaction's other components.");
private static final Step TX_BUILDING = new Step("Building a transaction.");
private static final Step TX_SIGNING = new Step("Signing a transaction.");
private static final Step TX_VERIFICATION = new Step("Verifying a transaction.");
private static final Step SIGS_GATHERING = new Step("Gathering a transaction's signatures.") {
// Wiring up a child progress tracker allows us to see the
// subflow's progress steps in our flow's progress tracker.
@Override public ProgressTracker childProgressTracker() {
return CollectSignaturesFlow.Companion.tracker();
}
};
private static final Step VERIFYING_SIGS = new Step("Verifying a transaction's signatures.");
private static final Step FINALISATION = new Step("Finalising a transaction.") {
@Override public ProgressTracker childProgressTracker() {
return FinalityFlow.Companion.tracker();
}
};
private final ProgressTracker progressTracker = new ProgressTracker(
ID_OTHER_NODES,
SENDING_AND_RECEIVING_DATA,
EXTRACTING_VAULT_STATES,
OTHER_TX_COMPONENTS,
TX_BUILDING,
TX_SIGNING,
TX_VERIFICATION,
SIGS_GATHERING,
FINALISATION
);
We then update the progress tracker’s current step as we progress through the flow as follows:
progressTracker.currentStep = ID_OTHER_NODES
progressTracker.setCurrentStep(ID_OTHER_NODES);