API: Flows

Note

Before reading this page, you should be familiar with the key concepts of Flows.

An example flow

Before we discuss the API offered by the flow, let’s consider what a standard flow may look like.

Imagine a flow for agreeing a basic ledger update between Alice and Bob. This flow will have two sides:

  • An Initiator side, that will initiate the request to update the ledger
  • A Responder side, that will respond to the request to update the ledger

Initiator

In our flow, the Initiator flow class will be doing the majority of the work:

Part 1 - Build the transaction

  1. Choose a notary for the transaction
  2. Create a transaction builder
  3. Extract any input states from the vault and add them to the builder
  4. Create any output states and add them to the builder
  5. Add any commands, attachments and timestamps to the builder

Part 2 - Sign the transaction

  1. Sign the transaction builder
  2. Convert the builder to a signed transaction

Part 3 - Verify the transaction

  1. Verify the transaction by running its contracts

Part 4 - Gather the counterparty’s signature

  1. Send the transaction to the counterparty
  2. Wait to receive back the counterparty’s signature
  3. Add the counterparty’s signature to the transaction
  4. Verify the transaction’s signatures

Part 5 - Finalize the transaction

  1. Send the transaction to the notary
  2. Wait to receive back the notarised transaction
  3. Record the transaction locally
  4. Store any relevant states in the vault
  5. Send the transaction to the counterparty for recording

We can visualize the work performed by initiator as follows:

_images/flow-overview.png

Responder

To respond to these actions, the responder takes the following steps:

Part 1 - Sign the transaction

  1. Receive the transaction from the counterparty
  2. Verify the transaction’s existing signatures
  3. Verify the transaction by running its contracts
  4. Generate a signature over the transaction
  5. Send the signature back to the counterparty

Part 2 - Record the transaction

  1. Receive the notarised transaction from the counterparty
  2. Record the transaction locally
  3. Store any relevant states in the vault

FlowLogic

In practice, a flow is implemented as one or more communicating FlowLogic subclasses. The FlowLogic subclass’s constructor can take any number of arguments of any type. The generic of FlowLogic (e.g. FlowLogic<SignedTransaction>) indicates the flow’s return type.

class Initiator(val arg1: Boolean,
                val arg2: Int,
                val counterparty: Party): FlowLogic<SignedTransaction>() { }

class Responder(val otherParty: Party) : FlowLogic<Unit>() { }
public static class Initiator extends FlowLogic<SignedTransaction> {
    private final boolean arg1;
    private final int arg2;
    private final Party counterparty;

    public Initiator(boolean arg1, int arg2, Party counterparty) {
        this.arg1 = arg1;
        this.arg2 = arg2;
        this.counterparty = counterparty;
    }

}

public static class Responder extends FlowLogic<Void> { }

FlowLogic annotations

Any flow from which you want to initiate other flows must be annotated with the @InitiatingFlow annotation. Additionally, if you wish to start the flow via RPC, you must annotate it with the @StartableByRPC annotation:

@InitiatingFlow
@StartableByRPC
class Initiator(): FlowLogic<Unit>() { }
@InitiatingFlow
@StartableByRPC
public static class Initiator extends FlowLogic<Unit> { }

Meanwhile, any flow that responds to a message from another flow must be annotated with the @InitiatedBy annotation. @InitiatedBy takes the class of the flow it is responding to as its single parameter:

@InitiatedBy(Initiator::class)
class Responder(val otherSideSession: FlowSession) : FlowLogic<Unit>() { }
@InitiatedBy(Initiator.class)
public static class Responder extends FlowLogic<Void> { }

Additionally, any flow that is started by a SchedulableState must be annotated with the @SchedulableFlow annotation.

Call

Each FlowLogic subclass must override FlowLogic.call(), which describes the actions it will take as part of the flow. For example, the actions of the initiator’s side of the flow would be defined in Initiator.call, and the actions of the responder’s side of the flow would be defined in Responder.call.

In order for nodes to be able to run multiple flows concurrently, and to allow flows to survive node upgrades and restarts, flows need to be checkpointable and serializable to disk. This is achieved by marking FlowLogic.call(), as well as any function invoked from within FlowLogic.call(), with an @Suspendable annotation.

class Initiator(val counterparty: Party): FlowLogic<Unit>() {
    @Suspendable
    override fun call() { }
}
public static class InitiatorFlow extends FlowLogic<Void> {
    private final Party counterparty;

    public Initiator(Party counterparty) {
        this.counterparty = counterparty;
    }

    @Suspendable
    @Override
    public Void call() throws FlowException { }

}

ServiceHub

Within FlowLogic.call, the flow developer has access to the node’s ServiceHub, which provides access to the various services the node provides. We will use the ServiceHub extensively in the examples that follow. You can also see API: ServiceHub for information about the services the ServiceHub offers.

Common flow tasks

There are a number of common tasks that you will need to perform within FlowLogic.call in order to agree ledger updates. This section details the API for common tasks.

Transaction building

The majority of the work performed during a flow will be to build, verify and sign a transaction. This is covered in API: Transactions.

Extracting states from the vault

When building a transaction, you’ll often need to extract the states you wish to consume from the vault. This is covered in API: Vault Query.

Retrieving information about other nodes

We can retrieve information about other nodes on the network and the services they offer using ServiceHub.networkMapCache.

Notaries

Remember that a transaction generally needs a notary to:

  • Prevent double-spends if the transaction has inputs
  • Serve as a timestamping authority if the transaction has a time-window

There are several ways to retrieve a notary from the network map:


progressTracker.setCurrentStep(ID_OTHER_NODES);

Specific counterparties

We can also use the network map to retrieve a specific counterparty:


CordaX500Name counterPartyName = new CordaX500Name("NodeA", "London", "GB");
Party namedCounterparty = getServiceHub().getIdentityService().wellKnownPartyFromX500Name(counterPartyName);
Party keyedCounterparty = getServiceHub().getIdentityService().partyFromKey(dummyPubKey);

Specific services

Finally, we can use the map to identify nodes providing a specific service (e.g. a regulator or an oracle):


PublicKey otherKey = getServiceHub().getKeyManagementService().freshKey();
SignedTransaction onceSignedTx2 = getServiceHub().signInitialTransaction(txBuilder, otherKey);

Communication between parties

In order to create a communication session between your initiator flow and the receiver flow you must call initiateFlow(party: Party): FlowSession

FlowSession instances in turn provide three functions:

  • send(payload: Any)
    • Sends the payload object
  • receive(receiveType: Class<R>): R
    • Receives an object of type receiveType
  • sendAndReceive(receiveType: Class<R>, payload: Any): R
    • Sends the payload object and receives an object of type receiveType back

InitiateFlow

initiateFlow creates a communication session with the passed in Party.


FlowSession counterpartySession = initiateFlow(counterparty);

Note that at the time of call to this function no actual communication is done, this is deferred to the first send/receive, at which point the counterparty will either:

  1. Ignore the message if they are not registered to respond to messages from this flow.
  2. Start the flow they have registered to respond to this flow.

Send

Once we have a FlowSession object we can send arbitrary data to a counterparty:


counterpartySession.send(new Object());

The flow on the other side must eventually reach a corresponding receive call to get this message.

Receive

We can also wait to receive arbitrary data of a specific type from a counterparty. Again, this implies a corresponding send call in the counterparty’s flow. A few scenarios:

  • We never receive a message back. In the current design, the flow is paused until the node’s owner kills the flow.
  • Instead of sending a message back, the counterparty throws a FlowException. This exception is propagated back to us, and we can use the error message to establish what happened.
  • We receive a message back, but it’s of the wrong type. In this case, a FlowException is thrown.
  • We receive back a message of the correct type. All is good.

Upon calling receive (or sendAndReceive), the FlowLogic is suspended until it receives a response.

We receive the data wrapped in an UntrustworthyData instance. This is a reminder that the data we receive may not be what it appears to be! We must unwrap the UntrustworthyData using a lambda:


UntrustworthyData<Integer> packet1 = counterpartySession.receive(Integer.class);
Integer integer = packet1.unwrap(data -> {
    // Perform checking on the object received.
    // T O D O: Check the received object.
    // Return the object.
    return data;
});

We’re not limited to sending to and receiving from a single counterparty. A flow can send messages to as many parties as it likes, and each party can invoke a different response flow:


FlowSession regulatorSession = initiateFlow(regulator);
regulatorSession.send(new Object());
UntrustworthyData<Object> packet3 = regulatorSession.receive(Object.class);

Warning

If you initiate several flows from the same @InitiatingFlow flow then on the receiving side you must be prepared to be initiated by any of the corresponding initiateFlow() calls! A good way of handling this ambiguity is to send as a first message a “role” message to the initiated flow, indicating which part of the initiating flow the rest of the counter-flow should conform to. For example send an enum, and on the other side start with a switch statement.

SendAndReceive

We can also use a single call to send data to a counterparty and wait to receive data of a specific type back. The type of data sent doesn’t need to match the type of the data received back:


UntrustworthyData<Boolean> packet2 = counterpartySession.sendAndReceive(Boolean.class, "You can send and receive any class!");
Boolean bool = packet2.unwrap(data -> {
    // Perform checking on the object received.
    // T O D O: Check the received object.
    // Return the object.
    return data;
});

Counterparty response

Suppose we’re now on the Responder side of the flow. We just received the following series of messages from the Initiator:

  1. They sent us an Any instance
  2. They waited to receive an Integer instance back
  3. They sent a String instance and waited to receive a Boolean instance back

Our side of the flow must mirror these calls. We could do this as follows:


Object obj = counterpartySession.receive(Object.class).unwrap(data -> data);
String string = counterpartySession.sendAndReceive(String.class, 99).unwrap(data -> data);
counterpartySession.send(true);

Why sessions?

Before FlowSession s were introduced the send/receive API looked a bit different. They were functions on FlowLogic and took the address Party as argument. The platform internally maintained a mapping from Party to session, hiding sessions from the user completely.

Although this is a convenient API it introduces subtle issues where a message that was originally meant for a specific session may end up in another.

Consider the following contrived example using the old Party based API:

@InitiatingFlow
class LaunchSpaceshipFlow : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val shouldLaunchSpaceship = receive<Boolean>(getPresident()).unwrap { it }
        if (shouldLaunchSpaceship) {
            launchSpaceship()
        }
    }

    fun launchSpaceship() {
    }

    fun getPresident(): Party {
        TODO()
    }
}

@InitiatedBy(LaunchSpaceshipFlow::class)
@InitiatingFlow
class PresidentSpaceshipFlow(val launcher: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val needCoffee = true
        send(getSecretary(), needCoffee)
        val shouldLaunchSpaceship = false
        send(launcher, shouldLaunchSpaceship)
    }

    fun getSecretary(): Party {
        TODO()
    }
}

@InitiatedBy(PresidentSpaceshipFlow::class)
class SecretaryFlow(val president: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        // ignore
    }
}

The intention of the flows is very clear: LaunchSpaceshipFlow asks the president whether a spaceship should be launched. It is expecting a boolean reply. The president in return first tells the secretary that they need coffee, which is also communicated with a boolean. Afterwards the president replies to the launcher that they don’t want to launch.

However the above can go horribly wrong when the launcher happens to be the same party getSecretary returns. In this case the boolean meant for the secretary will be received by the launcher!

This indicates that Party is not a good identifier for the communication sequence, and indeed the Party based API may introduce ways for an attacker to fish for information and even trigger unintended control flow like in the above case.

Hence we introduced FlowSession, which identifies the communication sequence. With FlowSession s the above set of flows would look like this:

@InitiatingFlow
class LaunchSpaceshipFlowCorrect : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val presidentSession = initiateFlow(getPresident())
        val shouldLaunchSpaceship = presidentSession.receive<Boolean>().unwrap { it }
        if (shouldLaunchSpaceship) {
            launchSpaceship()
        }
    }

    fun launchSpaceship() {
    }

    fun getPresident(): Party {
        TODO()
    }
}

@InitiatedBy(LaunchSpaceshipFlowCorrect::class)
@InitiatingFlow
class PresidentSpaceshipFlowCorrect(val launcherSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val needCoffee = true
        val secretarySession = initiateFlow(getSecretary())
        secretarySession.send(needCoffee)
        val shouldLaunchSpaceship = false
        launcherSession.send(shouldLaunchSpaceship)
    }

    fun getSecretary(): Party {
        TODO()
    }
}

@InitiatedBy(PresidentSpaceshipFlowCorrect::class)
class SecretaryFlowCorrect(val presidentSession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        // ignore
    }
}

Note how the president is now explicit about which session it wants to send to.

Porting from the old Party-based API

In the old API the first send or receive to a Party was the one kicking off the counter-flow. This is now explicit in the initiateFlow function call. To port existing code:


send(regulator, new Object()); // Old API
// becomes
FlowSession session = initiateFlow(regulator);
session.send(new Object());

Subflows

Subflows are pieces of reusable flows that may be run by calling FlowLogic.subFlow. There are two broad categories of subflows, inlined and initiating ones. The main difference lies in the counter-flow’s starting method, initiating ones initiate counter-flows automatically, while inlined ones expect some parent counter-flow to run the inlined counter-part.

Inlined subflows

Inlined subflows inherit their calling flow’s type when initiating a new session with a counterparty. For example, say we have flow A calling an inlined subflow B, which in turn initiates a session with a party. The FlowLogic type used to determine which counter-flow should be kicked off will be A, not B. Note that this means that the other side of this inlined flow must therefore be implemented explicitly in the kicked off flow as well. This may be done by calling a matching inlined counter-flow, or by implementing the other side explicitly in the kicked off parent flow.

An example of such a flow is CollectSignaturesFlow. It has a counter-flow SignTransactionFlow that isn’t annotated with InitiatedBy. This is because both of these flows are inlined; the kick-off relationship will be defined by the parent flows calling CollectSignaturesFlow and SignTransactionFlow.

In the code inlined subflows appear as regular FlowLogic instances, without either of the @InitiatingFlow or @InitiatedBy annotation.

Note

Inlined flows aren’t versioned; they inherit their parent flow’s version.

Initiating subflows

Initiating subflows are ones annotated with the @InitiatingFlow annotation. When such a flow initiates a session its type will be used to determine which @InitiatedBy flow to kick off on the counterparty.

An example is the @InitiatingFlow InitiatorFlow/@InitiatedBy ResponderFlow flow pair in the FlowCookbook.

Note

Initiating flows are versioned separately from their parents.

Core initiating subflows

Corda-provided initiating subflows are a little different to standard ones as they are versioned together with the platform, and their initiated counter-flows are registered explicitly, so there is no need for the InitiatedBy annotation.

An example is the FinalityFlow/FinalityHandler flow pair.

Built-in subflows

Corda provides a number of built-in flows that should be used for handling common tasks. The most important are:

  • CollectSignaturesFlow (inlined), which should be used to collect a transaction’s required signatures
  • FinalityFlow (initiating), which should be used to notarise and record a transaction as well as to broadcast it to all relevant parties
  • SendTransactionFlow (inlined), which should be used to send a signed transaction if it needed to be resolved on the other side.
  • ReceiveTransactionFlow (inlined), which should be used receive a signed transaction
  • ContractUpgradeFlow (initiating), which should be used to change a state’s contract
  • NotaryChangeFlow (initiating), which should be used to change a state’s notary

Let’s look at three very common examples.

FinalityFlow

FinalityFlow allows us to notarise the transaction and get it recorded in the vault of the participants of all the transaction’s states:


SignedTransaction notarisedTx1 = subFlow(new FinalityFlow(fullySignedTx, FINALISATION.childProgressTracker()));

We can also choose to send the transaction to additional parties who aren’t one of the state’s participants:


Set<Party> additionalParties = Collections.singleton(regulator);
SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()));

Only one party has to call FinalityFlow for a given transaction to be recorded by all participants. It does not need to be called by each participant individually.

CollectSignaturesFlow/SignTransactionFlow

The list of parties who need to sign a transaction is dictated by the transaction’s commands. Once we’ve signed a transaction ourselves, we can automatically gather the signatures of the other required signers using CollectSignaturesFlow:


SignedTransaction fullySignedTx = subFlow(new CollectSignaturesFlow(twiceSignedTx, Collections.emptySet(), SIGS_GATHERING.childProgressTracker()));

Each required signer will need to respond by invoking its own SignTransactionFlow subclass to check the transaction and provide their signature if they are satisfied:


class SignTxFlow extends SignTransactionFlow {
    private SignTxFlow(FlowSession otherSession, ProgressTracker progressTracker) {
        super(otherSession, progressTracker);
    }

    @Override
    protected void checkTransaction(SignedTransaction stx) {
        requireThat(require -> {
            // Any additional checking we see fit...
            DummyState outputState = (DummyState) stx.getTx().getOutputs().get(0).getData();
            assert (outputState.getMagicNumber() == 777);
            return null;
        });
    }
}

subFlow(new SignTxFlow(counterpartySession, SignTransactionFlow.tracker()));

SendTransactionFlow/ReceiveTransactionFlow

Verifying a transaction received from a counterparty also requires verification of every transaction in its dependency chain. This means the receiving party needs to be able to ask the sender all the details of the chain. The sender will use SendTransactionFlow for sending the transaction and then for processing all subsequent transaction data vending requests as the receiver walks the dependency chain using ReceiveTransactionFlow:


subFlow(new SendTransactionFlow(counterpartySession, twiceSignedTx));

// Optional request verification to further restrict data access.
subFlow(new SendTransactionFlow(counterpartySession, twiceSignedTx) {
    @Override
    protected void verifyDataRequest(@NotNull FetchDataFlow.Request.Data dataRequest) {
        // Extra request verification.
    }
});

We can receive the transaction using ReceiveTransactionFlow, which will automatically download all the dependencies and verify the transaction:


SignedTransaction verifiedTransaction = subFlow(new ReceiveTransactionFlow(counterpartySession));

We can also send and receive a StateAndRef dependency chain and automatically resolve its dependencies:


subFlow(new SendStateAndRefFlow(counterpartySession, dummyStates));

// On the receive side ...
List<StateAndRef<DummyState>> resolvedStateAndRef = subFlow(new ReceiveStateAndRefFlow<DummyState>(counterpartySession));

Why inlined subflows?

Inlined subflows provide a way to share commonly used flow code while forcing users to create a parent flow. Take for example CollectSignaturesFlow. Say we made it an initiating flow that automatically kicks off SignTransactionFlow that signs the transaction. This would mean malicious nodes can just send any old transaction to us using CollectSignaturesFlow and we would automatically sign it!

By making this pair of flows inlined we provide control to the user over whether to sign the transaction or not by forcing them to nest it in their own parent flows.

In general if you’re writing a subflow the decision of whether you should make it initiating should depend on whether the counter-flow needs broader context to achieve its goal.

FlowException

Suppose a node throws an exception while running a flow. Any counterparty flows waiting for a message from the node (i.e. as part of a call to receive or sendAndReceive) will be notified that the flow has unexpectedly ended and will themselves end. However, the exception thrown will not be propagated back to the counterparties.

If you wish to notify any waiting counterparties of the cause of the exception, you can do so by throwing a FlowException:

/**
 * Exception which can be thrown by a [FlowLogic] at any point in its logic to unexpectedly bring it to a permanent end.
 * The exception will propagate to all counterparty flows and will be thrown on their end the next time they wait on a
 * [FlowSession.receive] or [FlowSession.sendAndReceive]. Any flow which no longer needs to do a receive, or has already ended,
 * will not receive the exception (if this is required then have them wait for a confirmation message).
 *
 * [FlowException] (or a subclass) can be a valid expected response from a flow, particularly ones which act as a service.
 * It is recommended a [FlowLogic] document the [FlowException] types it can throw.
 */
open class FlowException(message: String?, cause: Throwable?) : CordaException(message, cause) {
    constructor(message: String?) : this(message, null)
    constructor(cause: Throwable?) : this(cause?.toString(), cause)
    constructor() : this(null, null)
}

The flow framework will automatically propagate the FlowException back to the waiting counterparties.

There are many scenarios in which throwing a FlowException would be appropriate:

  • A transaction doesn’t verify()
  • A transaction’s signatures are invalid
  • The transaction does not match the parameters of the deal as discussed
  • You are reneging on a deal

ProgressTracker

We can give our flow a progress tracker. This allows us to see the flow’s progress visually in our node’s CRaSH shell.

To provide a progress tracker, we have to override FlowLogic.progressTracker in our flow:


private static final Step ID_OTHER_NODES = new Step("Identifying other nodes on the network.");
private static final Step SENDING_AND_RECEIVING_DATA = new Step("Sending data between parties.");
private static final Step EXTRACTING_VAULT_STATES = new Step("Extracting states from the vault.");
private static final Step OTHER_TX_COMPONENTS = new Step("Gathering a transaction's other components.");
private static final Step TX_BUILDING = new Step("Building a transaction.");
private static final Step TX_SIGNING = new Step("Signing a transaction.");
private static final Step TX_VERIFICATION = new Step("Verifying a transaction.");
private static final Step SIGS_GATHERING = new Step("Gathering a transaction's signatures.") {
    // Wiring up a child progress tracker allows us to see the
    // subflow's progress steps in our flow's progress tracker.
    @Override
    public ProgressTracker childProgressTracker() {
        return CollectSignaturesFlow.tracker();
    }
};
private static final Step VERIFYING_SIGS = new Step("Verifying a transaction's signatures.");
private static final Step FINALISATION = new Step("Finalising a transaction.") {
    @Override
    public ProgressTracker childProgressTracker() {
        return FinalityFlow.tracker();
    }
};

private final ProgressTracker progressTracker = new ProgressTracker(
        ID_OTHER_NODES,
        SENDING_AND_RECEIVING_DATA,
        EXTRACTING_VAULT_STATES,
        OTHER_TX_COMPONENTS,
        TX_BUILDING,
        TX_SIGNING,
        TX_VERIFICATION,
        SIGS_GATHERING,
        FINALISATION
);

We then update the progress tracker’s current step as we progress through the flow as follows:


progressTracker.setCurrentStep(ID_OTHER_NODES);