Using the client RPC API¶
In this tutorial we will build a simple command line utility that connects to a node, creates some Cash transactions and meanwhile dumps the transaction graph to the standard output. We will then put some simple visualisation on top. For an explanation on how the RPC works see Client RPC.
We start off by connecting to the node itself. For the purposes of the tutorial we will use the Driver to start up a notary and a node that issues/exits and moves Cash around for herself. To authenticate we will use the certificates of the nodes directly.
Note how we configure the node to create a user that has permission to start the CashFlow.
enum class PrintOrVisualise {
Print,
Visualise
}
fun main(args: Array<String>) {
require(args.isNotEmpty()) { "Usage: <binary> [Print|Visualise]" }
val printOrVisualise = PrintOrVisualise.valueOf(args[0])
val baseDirectory = Paths.get("build/rpc-api-tutorial")
val user = User("user", "password", permissions = setOf(startFlowPermission<CashIssueFlow>(),
startFlowPermission<CashPaymentFlow>(),
startFlowPermission<CashExitFlow>()))
driver(driverDirectory = baseDirectory) {
startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type)))
val node = startNode(ALICE.name, rpcUsers = listOf(user)).get()
Now we can connect to the node itself using a valid RPC login. We login using the configured user.
val client = node.rpcClientToNode()
val proxy = client.start("user", "password").proxy
thread {
generateTransactions(proxy)
}
We start generating transactions in a different thread (generateTransactions
to be defined later) using proxy
,
which exposes the full RPC interface of the node:
/**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
* to be present.
*/
override val protocolVersion: Int get() = nodeIdentity().platformVersion
/**
* Returns a data feed of currently in-progress state machine infos and an observable of future state machine adds/removes.
*/
@RPCReturnsObservables
fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate>
@Deprecated("This function will be removed in a future milestone", ReplaceWith("stateMachinesFeed()"))
fun stateMachinesAndUpdates() = stateMachinesFeed()
/**
* Returns a snapshot of vault states for a given query criteria (and optional order and paging specification)
*
* Generic vault query function which takes a [QueryCriteria] object to define filters,
* optional [PageSpecification] and optional [Sort] modification criteria (default unsorted),
* and returns a [Vault.Page] object containing the following:
* 1. states as a List of <StateAndRef> (page number and size defined by [PageSpecification])
* 2. states metadata as a List of [Vault.StateMetadata] held in the Vault States table.
* 3. total number of results available if [PageSpecification] supplied (otherwise returns -1)
* 4. status types used in this query: UNCONSUMED, CONSUMED, ALL
* 5. other results (aggregate functions with/without using value groups)
*
* @throws VaultQueryException if the query cannot be executed for any reason
* (missing criteria or parsing error, paging errors, unsupported query, underlying database error)
*
* Notes
* If no [PageSpecification] is provided, a maximum of [DEFAULT_PAGE_SIZE] results will be returned.
* API users must specify a [PageSpecification] if they are expecting more than [DEFAULT_PAGE_SIZE] results,
* otherwise a [VaultQueryException] will be thrown alerting to this condition.
* It is the responsibility of the API user to request further pages and/or specify a more suitable [PageSpecification].
*/
// DOCSTART VaultQueryByAPI
@RPCReturnsObservables
fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria,
paging: PageSpecification,
sorting: Sort,
contractType: Class<out T>): Vault.Page<T>
// DOCEND VaultQueryByAPI
// Note: cannot apply @JvmOverloads to interfaces nor interface implementations
// Java Helpers
// DOCSTART VaultQueryAPIHelpers
fun <T : ContractState> vaultQuery(contractType: Class<out T>): Vault.Page<T> {
return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractType)
Warning
This API is evolving and will continue to grow as new functionality and features added to Corda are made available to RPC clients.
The one we need in order to dump the transaction graph is verifiedTransactions
. The type signature tells us that the
RPC will return a list of transactions and an Observable stream. This is a general pattern, we query some data and the
node will return the current snapshot and future updates done to it. Observables are described in further detail in
Client RPC
val (transactions: List<SignedTransaction>, futureTransactions: Observable<SignedTransaction>) = proxy.verifiedTransactions()
The graph will be defined by nodes and edges between them. Each node represents a transaction and edges represent
output-input relations. For now let’s just print NODE <txhash>
for the former and EDGE <txhash> <txhash>
for the
latter.
when (printOrVisualise) {
PrintOrVisualise.Print -> {
futureTransactions.startWith(transactions).subscribe { transaction ->
println("NODE ${transaction.id}")
transaction.tx.inputs.forEach { input ->
println("EDGE ${input.txhash} ${transaction.id}")
}
}
}
Now we just need to create the transactions themselves!
fun generateTransactions(proxy: CordaRPCOps) {
val (vault, vaultUpdates) = proxy.vaultAndUpdates()
vaultUpdates.notUsed()
var ownedQuantity = vault.fold(0L) { sum, state ->
sum + (state.state.data as Cash.State).amount.quantity
}
val issueRef = OpaqueBytes.of(0)
val (parties, partyUpdates) = proxy.networkMapUpdates()
partyUpdates.notUsed()
val notary = parties.first { it.advertisedServices.any { it.info.type.isNotary() } }.notaryIdentity
val me = proxy.nodeIdentity().legalIdentity
while (true) {
Thread.sleep(1000)
val random = SplittableRandom()
val n = random.nextDouble()
if (ownedQuantity > 10000 && n > 0.8) {
val quantity = Math.abs(random.nextLong()) % 2000
proxy.startFlow(::CashExitFlow, Amount(quantity, USD), issueRef)
ownedQuantity -= quantity
} else if (ownedQuantity > 1000 && n < 0.7) {
val quantity = Math.abs(random.nextLong() % Math.min(ownedQuantity, 2000))
proxy.startFlow(::CashPaymentFlow, Amount(quantity, USD), me)
} else {
val quantity = Math.abs(random.nextLong() % 1000)
proxy.startFlow(::CashIssueFlow, Amount(quantity, USD), issueRef, me, notary)
ownedQuantity += quantity
}
}
}
We utilise several RPC functions here to query things like the notaries in the node cluster or our own vault. These RPC
functions also return Observable
objects so that the node can send us updated values. However, we don’t need updates
here and so we mark these observables as notUsed
. (As a rule, you should always either subscribe to an Observable
or mark it as not used. Failing to do this will leak resources in the node.)
Then in a loop we generate randomly either an Issue, a Pay or an Exit transaction.
The RPC we need to initiate a Cash transaction is startFlowDynamic
which may start an arbitrary flow, given sufficient
permissions to do so. We won’t use this function directly, but rather a type-safe wrapper around it startFlow
that
type-checks the arguments for us.
Finally we have everything in place: we start a couple of nodes, connect to them, and start creating transactions while listening on successfully created ones, which are dumped to the console. We just need to run it!:
# Build the example
./gradlew docs/source/example-code:installDist
# Start it
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Print
Now let’s try to visualise the transaction graph. We will use a graph drawing library called graphstream
PrintOrVisualise.Visualise -> {
val graph = MultiGraph("transactions")
transactions.forEach { transaction ->
graph.addNode<Node>("${transaction.id}")
}
transactions.forEach { transaction ->
transaction.tx.inputs.forEach { ref ->
graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
}
}
futureTransactions.subscribe { transaction ->
graph.addNode<Node>("${transaction.id}")
transaction.tx.inputs.forEach { ref ->
graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
}
}
graph.display()
}
}
waitForAllNodesToFinish()
}
}
If we run the client with Visualise
we should see a simple random graph being drawn as new transactions are being created.
Whitelisting classes from your CorDapp with the Corda node¶
As described in Client RPC, you have to whitelist any additional classes you add that are needed in RPC requests or responses with the Corda node. Here’s an example of both ways you can do this for a couple of example classes.
// Not annotated, so need to whitelist manually.
data class ExampleRPCValue(val foo: String)
// Annotated, so no need to whitelist manually.
@CordaSerializable
data class ExampleRPCValue2(val bar: Int)
class ExampleRPCCordaPluginRegistry : CordaPluginRegistry() {
override fun customizeSerialization(custom: SerializationCustomization): Boolean {
// Add classes like this.
custom.addToWhitelist(ExampleRPCValue::class.java)
// You should return true, otherwise your plugin will be ignored for registering classes with Kryo.
return true
}
}
See more on plugins in Running a node.
Warning
We will be replacing the use of Kryo in the serialization framework and so additional changes here are likely.
Security¶
RPC credentials associated with a Client must match the permission set configured on the server Node. This refers to both authentication (username and password) and role-based authorisation (a permissioned set of RPC operations an authenticated user is entitled to run).
Note
Permissions are represented as String’s to allow RPC implementations to add their own permissioning. Currently
the only permission type defined is StartFlow, which defines a list of whitelisted flows an authenticated use may
execute. An administrator user (or a developer) may also be assigned the ALL
permission, which grants access to
any flow.
In the instructions above the server node permissions are configured programmatically in the driver code:
driver(driverDirectory = baseDirectory) {
val user = User("user", "password", permissions = setOf(startFlowPermission<CashFlow>()))
val node = startNode("CN=Alice Corp,O=Alice Corp,L=London,C=GB", rpcUsers = listOf(user)).get()
When starting a standalone node using a configuration file we must supply the RPC credentials as follows:
rpcUsers : [
{ username=user, password=password, permissions=[ StartFlow.net.corda.flows.CashFlow ] }
]
When using the gradle Cordformation plugin to configure and deploy a node you must supply the RPC credentials in a similar manner:
rpcUsers = [
['username' : "user",
'password' : "password",
'permissions' : ["StartFlow.net.corda.flows.CashFlow"]]
]
You can then deploy and launch the nodes (Notary and Alice) as follows:
# to create a set of configs and installs under ``docs/source/example-code/build/nodes`` run
./gradlew docs/source/example-code:deployNodes
# to open up two new terminals with the two nodes run
./docs/source/example-code/build/nodes/runnodes
# followed by the same commands as before:
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Print
./docs/source/example-code/build/install/docs/source/example-code/bin/client-rpc-tutorial Visualise
With regards to the start flow RPCs, there is an extra layer of security whereby the flow to be executed has to be
annotated with @StartableByRPC
. Flows without this annotation cannot execute using RPC.
See more on security in Secure coding guidelines, node configuration in Node configuration and Cordformation in Running a node.