Correlate message for multi instance task

Okay there we go. See the problem (and just tested it to confirm).

So when using a Event Based Gateway, the token is waiting on the Gateway rather than the Events. So the Local Variable are not created yet.

This does not appear to be documented either… :frowning:

So using the gateway is what seems to break your model as the local variables are not available when waiting at the gateway.

Created a docs change for this https://github.com/camunda/camunda-docs-manual/pull/240. Will see what CAM staff say.

3 Likes

Thank you for helping.
Looks like I should find another way to make asynchronous integration with external services.

Have you looked at using the external task pattern?

@thedenische the easiest fix to your model is to use Sub-Process scoped Process Variables: aka: Process Variables that are part of the embedded Sub-Process.

So you have vars such as “correlationid-success” and “correlationid-failure”, and then change localVariableEquals to processVariableEquals

Do you mean just move all multi-instance body to separate process, switch to processInstanceVariables and use multi-instance Call Activity?

No. When you create a sub process, it is its own process instance. Thus your embedded sub process has process variables. When you send your variables into your sub process as using the multi instance, you can use the multi instance element variable to pass your correlation var. take a look at: Pattern Review: DMN Looping for Array Input for a example

I didn’t get, how I’m should set such variable
(from method wait(DelegateExecution execution) I suppose)

In the process you showed above: you have a init script: I would assume you are generating some array of correlation IDs? As you need these in your multi-instance sub process? So when you configure your multi instance you are setting your collection to that array of objects (object would be your success and failure correlation id, or just a array of correlation Ids, if its the same var but different message names).

Ok, now I understand, but I generate correlationId inside multi-instance (inside Start waiting task).
In real process inside wait() method I generate correlation Id and send request to external asynchronous rest-api.

As far as I understand, I have several options

  1. Use Receive task
  • Replace Event based gateway with Receive task and add Exclusive Gateway after Receive task with two branches (isError == true and isError == false)
  • Use as correlationId local variable “correlationId”
  • When receive success callback set variable isError = false and call
    runtimeService.createMessageCorrelation("callback").localVariableEquals("correlationId", correlationId).correlate()
  • When receive error callback set variable isError = true and call
    runtimeService.createMessageCorrelation("callback").localVariableEquals("correlationId", correlationId).correlate()
  1. Use AbstractBpmnActivityBehavior described here https://github.com/camunda/camunda-bpm-examples/tree/master/servicetask/service-invocation-asynchronous
  • Replace multi-instance body with one delegate implementing AbstractBpmnActivityBehavior.
  • Use executionId as correlation Id
  • When receive success callback, call
    runtimeService.signal(correlationId, "success", null, null)
  • When receive error callback, call
    runtimeService.signal(correlationId, "error", null, null)
  • In delegate call leave(execution) if signal “success” or throw exception if signal “error”.
  1. Use External Task Pattern
  • Replace multi-instance body with external task
  • Create some scheduled service (for example with @Scheduled), that will fetchAndLock task, and use taskId as correlationId
  • When receive success callback, call
    externalTaskService.complete(correlationId, "workerId")
  • When receive error callback, call
    externalTaskService.handleFailure(correlationId, "workerId". "errorMessage", 0, 0)
    or
    externalTaskService.handleBpmnError(correlationId, "workerId". "errorCode", 0, 0)
1 Like

If you are generating the correlationId values inside each instance of the multi-instance, then just store those values as process variables within the Sub-Process. And everything should continue to work. You dont need to use Local Variables on the Catch Events.

1 Like

If inside “Start waiting” I will use process instance variable - it will be overwritten by another parallel subprocesses

public void wait(DelegateExecution execution) {
    UUID correlationId = UUID.randomUUID();
    execution.setVariable("correlationId", correlationId.toString());
    log.info("Start waiting correlationId = {}", correlationId);
}

If inside “Start waiting” I will use local variable - it will not be available in message events

public void wait(DelegateExecution execution) {
    UUID correlationId = UUID.randomUUID();
    execution.setVariableLocal("correlationId", correlationId.toString());
    log.info("Start waiting correlationId = {}", correlationId);
}
1 Like

You can do something like this:

In the SetVar script i have execution.setVariableLocal('dog','cat');.

and you can do a correlation using process variables:

doing /message in the REst api:

{
  "messageName" : "m1",
  "processCorrelationKeys" : {
    "dog" : {"value" : "cat", "type": "String"}
  }
}

Yes, I see. It’s the same as my option with receive task. The main point to remove event base gate way.
Thank you a lot for your help.

Try with the event based gateway. I believe it should still work

If I try to set local variable and then correlate with process instnace variable:

public void wait(DelegateExecution execution) {
    UUID correlationId = UUID.randomUUID();
    execution.setVariableLocal("correlationId", correlationId.toString());
    log.info("Start waiting correlationId = {}", correlationId);
}

public void correlate(String correlationId) {
    runtimeService.createMessageCorrelation("success")
            .processInstanceVariableEquals("correlationId", correlationId)
            .correlate();
}

I got following error:

org.camunda.bpm.engine.MismatchingMessageCorrelationException:
ENGINE-13031 Cannot correlate a message with name 'success' to a single execution.
3 executions match the correlation keys:
CorrelationSet [businessKey=null, processInstanceId=null, processDefinitionId=null,
correlationKeys={
    correlationId => Untyped value 'aff42bd6-b333-428f-b311-7784fd1aec6d',
    isTransient = false
}, localCorrelationKeys=null, tenantId=null, isTenantIdSet=false]

That’s because you are using correlate rather than correlateAll see the other methods and the java doc. Correlate only correlates to a single instance. You need to tell it to correlate to all matching instances

Yes, but I have 3 parallel executions, and get 3 callbacks for each execution. So I should correlate only one execution per callback.

@thedenische can you double confirm that the correlationId value in each of your sub-process instances are different values?

Yes, correlationId has three different values for each multi instance subprocess, and for any value I get this error, when correlating message.