kotlinblockchaincorda

Calling subFlow inside a main flow to update the same state in corda


Here in the main flow which is updating the ContainerState, and I just tried to call a subFlow of another UserDefined flow which is also updating attribute of same state. (I know, it can be done in main flow itself without calling another flow, but this is just a sample created for asking here)

So Before creating builder of main flow, I am calling the subFlow and update it with some parameter. Both are referring to ContainerState of same id. I am getting

[WARN] 13:07:03,441 [Mock network] interceptors.DumpHistoryOnErrorInterceptor. - Flow [fb6a42fd-b9bf-45bb-ab98-2f2510235f91] error {fiber-id=10000008, flow-id=fb6a42fd-b9bf-45bb-ab98-2f2510235f91, invocation_id=38c29575-637a-459b-b8e9-81a960e93c7c, invocation_timestamp=2022-12-07T13:07:03.064Z, origin=O=Mock Company 1, L=London, C=GB, session_id=38c29575-637a-459b-b8e9-81a960e93c7c, session_timestamp=2022-12-07T13:07:03.064Z, thread-id=406, tx_id=EB1EA4EEA934C9C874461A6DDE35892057EBC472F405AF95241784CEDE3D6C92}

net.corda.core.flows.UnexpectedFlowEndException: Counter-flow errored at Received unexpected counter-flow exception from peer O=Mock Company 1, L=London, C=GB.() ~[?:?]

Doesn't subflow suspends the main flow and the finalityFlow in subflow will complete all the required steps there?

UpdateFlow:

@InitiatingFlow
@StartableByRPC
class UpdateContainerStateFlow(private val containerId: UUID,
val status: String, val comments: String) : FlowLogic<SignedTransaction>() {
    override val progressTracker = ProgressTracker()

    @Suspendable
    override fun call(): SignedTransaction {

        val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(UniqueIdentifier(id = containerId)))
        val currentContainer = serviceHub.vaultService.queryBy<ContainerState>(queryCriteria).states.single()

        val notary = serviceHub.networkMapCache.getNotary( CordaX500Name.parse("O=Notary,L=London,C=GB"))

        val updateContainer = currentContainer.state.data.copy(
           status = status,
           commentsFromStakeHolders = comments
        )

        val abc = subFlow(UpdateContainerStateFlow2(
            containerId = containerId,
            status = "subflowOne",
            comments = "test"
        ))

        val builder = TransactionBuilder(notary)
                .addCommand(ContainerContract.Commands.Update(), listOf(updateContainer.logisticCompany.owningKey, updateContainer.partnerCompany.owningKey))
                .addInputState(currentContainer)
                .addOutputState(updateContainer)

        // Step 4. Verify and sign it with our KeyPair.
        builder.verify(serviceHub)
        val ptx = serviceHub.signInitialTransaction(builder)

        val sessions = (updateContainer.participants - ourIdentity).map { initiateFlow(it) }.toSet()
        val stx = subFlow(CollectSignaturesFlow(ptx, sessions))

        // Step 7. Assuming no exceptions, we can now finalise the transaction
        return subFlow(FinalityFlow(stx, sessions))
    }
}

@InitiatedBy(UpdateContainerStateFlow::class)
class UpdateContainerStateFlowResponder(val counterpartySession: FlowSession) : FlowLogic<SignedTransaction>() {
    @Suspendable
    override fun call(): SignedTransaction {
        val signTransactionFlow = object : SignTransactionFlow(counterpartySession) {
            override fun checkTransaction(stx: SignedTransaction) = requireThat {
               //Addition checks
            }
        }
        val txId = subFlow(signTransactionFlow).id
        return subFlow(ReceiveFinalityFlow(counterpartySession, expectedTxId = txId))
    }
}

UpdateContainerStateFlow2() is nothing but the same.

I think, there is something wrong in my understanding. Please help me to correct it. Thanks.


Solution

  • I got the issue, it was because I was using the same currentContainer

    val currentContainer = serviceHub.vaultService.queryBy<ContainerState>(queryCriteria).states.single()
    

    in the transaction builder after the subflow is executed. Therefore Corda will throw error that the input state used is already consumed.