Correlate message for multi instance task

Hi,
Imagine a process like that:


I need send separate message for each instance of Wait for message task. How can i do that? Currently I used workaround demonstrated here which means that my message name is dynamic like MY_MESSAGE_dynamicPart_. I can easily send message through REST API using message command for example:

{
“messageName” : “MY_MESSAGE_1”,
}
Instead of individual message names I’d like to hit each task using it’s process variable(imagine that some transaction id is generated as a part of Do something task) or loop index variable (e.g send message to nth element from collection) How can I achieve this? I try play with process variables but is seems that they are global (e.g loopCounter variable) which means that all instances share same value set and I cannot use it as a correlationVariables because it doesn’t identify exact one task instance

Do you have more than 1 variable you want to use ? What was the reason for not wanting to use a dynamic message name such as my-message-${dynamicVar} ?

You can set a Input mapping in your receive task that is the dynamic variable values you need per instance. You will need to get the dynamic values from somewhere: since your multi instance is a sub process, then you could use a collection of objects as your collection that runs the multi instance, and access each object.

Currently I use dynamic message names cause see no other way. I dislike it cause I have now individual message type/name for each task. I prefer one broad message type which is revived by tasks who matches correlation criteria. But this is a matter of taste. Current approach is useless when I do not know in advance a unique key which can be used to generate a massage. Imagine that my service task starts a transaction in external system identified by some UUID. I’d like use this transaction UUID in a message to finalize this concrete instance. Is it feasible?

@Kikol

Take a look at this example:

dynamicVariable.bpmn (6.8 KB)

You can correlate with a POST to /message in teh API such as:

{
  "messageName" : "MyCustomMessage",
  "localCorrelationKeys" : {
    "customMatchVariable" : {"value" : "cat", "type": "String"}
  }
}
5 Likes

Thanks Stephen for valuable example. It helps me a lot. You assume that values used into wait for message task are known before loop(cat,dog,horse). I wonder about more dynamic solution where correlation variables are generated inside loop in some service task which preceded Receive message task. Is it feasible to model something like this?

That’s fine. You store your variables are process variables inside of the sub process scope. Then pass them into receive message in the same way

2 Likes

This is exactly what I need. Thanks a lot.

I can understand the proposed solution so far.
I was wondering if the correlation has to be made across all instances of the main process though using just one value. Could the activityInstanceID be used to make the value unique within a Camunda deployment.
Actually I wanted to create the ID in the corresponding send task (and pass it with the outbound message) and pass the value to the receive task for the correlation.
The activeInstanceID looks like: SendTask_0725jaj:0c925f08-f312-11e8-a011-80000b689ff2 for the send task. Are the latter 35 characters of the string unique?
Any idea how to pass the value from the send task to the receive task?

Thanks for clarification!
Cheers Ingo

Is it possible to realize the same behavior for following diagram?

Start waiting executes following code:

public void wait(DelegateExecution execution) {
    UUID correlationId = UUID.randomUUID();
    runtimeService.setVariableLocal(execution.getId(), "correlationId", correlationId.toString());
    log.info("Start waiting correlationId = {}", correlationId);
}

And then I am trying to correlate with following code:

public void correlate(String correlationId) {
    runtimeService.createMessageCorrelation("success")
            .localVariableEquals("correlationId", correlationId)
            .correlate();
}

When I call correlate method I get:

org.camunda.bpm.engine.MismatchingMessageCorrelationException: Cannot correlate message 'success': No process definition or execution matches the parameters

Looks like it’s impossible to pass local variable to an intermediate catch message event.

@thedenische Can you look in cockpit and show a screenshot of the message waiting with the local variable in its scope? Just want to confirm that it’s there

@StephenOTT, thank you for prompt response.

I can publish this project on gitlub, if it will be more convenient.

Okay there we go. See the problem (and just tested it to confirm).

So when using a Event Based Gateway, the token is waiting on the Gateway rather than the Events. So the Local Variable are not created yet.

This does not appear to be documented either… :frowning:

So using the gateway is what seems to break your model as the local variables are not available when waiting at the gateway.

Created a docs change for this https://github.com/camunda/camunda-docs-manual/pull/240. Will see what CAM staff say.

1 Like

Thank you for helping.
Looks like I should find another way to make asynchronous integration with external services.

Have you looked at using the external task pattern?

@thedenische the easiest fix to your model is to use Sub-Process scoped Process Variables: aka: Process Variables that are part of the embedded Sub-Process.

So you have vars such as “correlationid-success” and “correlationid-failure”, and then change localVariableEquals to processVariableEquals

Do you mean just move all multi-instance body to separate process, switch to processInstanceVariables and use multi-instance Call Activity?

No. When you create a sub process, it is its own process instance. Thus your embedded sub process has process variables. When you send your variables into your sub process as using the multi instance, you can use the multi instance element variable to pass your correlation var. take a look at: Pattern Review: DMN Looping for Array Input for a example

I didn’t get, how I’m should set such variable
(from method wait(DelegateExecution execution) I suppose)

In the process you showed above: you have a init script: I would assume you are generating some array of correlation IDs? As you need these in your multi-instance sub process? So when you configure your multi instance you are setting your collection to that array of objects (object would be your success and failure correlation id, or just a array of correlation Ids, if its the same var but different message names).

Ok, now I understand, but I generate correlationId inside multi-instance (inside Start waiting task).
In real process inside wait() method I generate correlation Id and send request to external asynchronous rest-api.

As far as I understand, I have several options

  1. Use Receive task
  • Replace Event based gateway with Receive task and add Exclusive Gateway after Receive task with two branches (isError == true and isError == false)
  • Use as correlationId local variable “correlationId”
  • When receive success callback set variable isError = false and call
    runtimeService.createMessageCorrelation("callback").localVariableEquals("correlationId", correlationId).correlate()
  • When receive error callback set variable isError = true and call
    runtimeService.createMessageCorrelation("callback").localVariableEquals("correlationId", correlationId).correlate()
  1. Use AbstractBpmnActivityBehavior described here https://github.com/camunda/camunda-bpm-examples/tree/master/servicetask/service-invocation-asynchronous
  • Replace multi-instance body with one delegate implementing AbstractBpmnActivityBehavior.
  • Use executionId as correlation Id
  • When receive success callback, call
    runtimeService.signal(correlationId, "success", null, null)
  • When receive error callback, call
    runtimeService.signal(correlationId, "error", null, null)
  • In delegate call leave(execution) if signal “success” or throw exception if signal “error”.
  1. Use External Task Pattern
  • Replace multi-instance body with external task
  • Create some scheduled service (for example with @Scheduled), that will fetchAndLock task, and use taskId as correlationId
  • When receive success callback, call
    externalTaskService.complete(correlationId, "workerId")
  • When receive error callback, call
    externalTaskService.handleFailure(correlationId, "workerId". "errorMessage", 0, 0)
    or
    externalTaskService.handleBpmnError(correlationId, "workerId". "errorCode", 0, 0)