Designing concurrent process start and correlations with Camunda

I am planning a simple workflow (shown in blue on the right) which should receive shipments belonging to the same order and continue the flow once all shipments have been collected.

Receiving shipments happen via REST and I plan to correlate to the right workflow instance using the order_id which is part of the request. The REST service is running in a cluster, so no in-VM locking is possible at this point.

I see two serious issues with this design:

  • concurrent inbound shipment requests will fail to correlate so they will start multiple workflow instances in the same time
  • even if the flow is started once successfully, Camunda will throw concurrent modification exception when simultanous shipment requests try to update the same workflow instance’s variables

How can I design this scenario to avoid these problems?

Thanks much in advance!

Hi @kjozsa,

Have you considered handling the receive shipments logic through an Event Sub-process? You can have an Event Sub-process with a non-interrupting Message Start Event. The sub-process can then increment a Process Variable that can be bound to a Intermediate Conditional Event. You can see a nice example in the documentation about Event Sub-process here: https://docs.camunda.org/manual/latest/reference/bpmn20/subprocesses/event-subprocess/.

Best,
Nikola

Hi Nikola,

I tried the following:

and sent 20 parallel REST request messages for correlation (so it should not yet end the flow as it waits for 100 messages). It receives a few according to the log but then I receive a bunch of errors from the engine:

16:21:01.726  WARN [nio-5001-exec-3] ExceptionHandler             : org.camunda.bpm.engine.OptimisticLockingException: ENGINE-03005 Execution of 'DELETE EventSubscriptionEntity[4c9ea600-3459-11e9-b717-0242ab803f11]' failed. Entity was updated by another transaction concurrently.
	at org.camunda.bpm.engine.impl.db.EnginePersistenceLogger.concurrentUpdateDbEntityException(EnginePersistenceLogger.java:130)
	at org.camunda.bpm.engine.impl.db.entitymanager.DbEntityManager.handleOptimisticLockingException(DbEntityManager.java:406)
	at org.camunda.bpm.engine.impl.db.entitymanager.DbEntityManager.checkFlushResults(DbEntityManager.java:365)
	at org.camunda.bpm.engine.impl.db.entitymanager.DbEntityManager.flushDbOperations(DbEntityManager.java:345)
	at org.camunda.bpm.engine.impl.db.entitymanager.DbEntityManager.flushDbOperationManager(DbEntityManager.java:314)
	at org.camunda.bpm.engine.impl.db.entitymanager.DbEntityManager.flush(DbEntityManager.java:286)
	at org.camunda.bpm.engine.impl.interceptor.CommandContext.flushSessions(CommandContext.java:203)
	at org.camunda.bpm.engine.impl.interceptor.CommandContext.close(CommandContext.java:132)
	at org.camunda.bpm.engine.impl.interceptor.CommandContextInterceptor.execute(CommandContextInterceptor.java:113)
	at org.camunda.bpm.engine.spring.SpringTransactionInterceptor$1.doInTransaction(SpringTransactionInterceptor.java:42)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
	at org.camunda.bpm.engine.spring.SpringTransactionInterceptor.execute(SpringTransactionInterceptor.java:40)
	at org.camunda.bpm.engine.impl.interceptor.ProcessApplicationContextInterceptor.execute(ProcessApplicationContextInterceptor.java:66)
	at org.camunda.bpm.engine.impl.interceptor.LogInterceptor.execute(LogInterceptor.java:30)
	at org.camunda.bpm.engine.impl.MessageCorrelationBuilderImpl.execute(MessageCorrelationBuilderImpl.java:240)
	at org.camunda.bpm.engine.impl.MessageCorrelationBuilderImpl.correlateWithResult(MessageCorrelationBuilderImpl.java:184)
	at org.camunda.bpm.engine.rest.impl.MessageRestServiceImpl.deliverMessage(MessageRestServiceImpl.java:56)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)

I do not try to update any variables yet from any part of the flows, it’s only logging at every point so far. What do I miss here?

We have also tried this solution. The problem is that if we increment the variable we encounter the dirty read problem.

Hi @kjozsa,

Sorry for the long delay.

That’s a strange behavior. The engine uses an Optimistic Concurrency control mechanism to handle concurrent operations. The OptimisticLockingException is expected and handled by the engine by retrying the operations. You should only see the exception if some operation isn’t handled properly. In your case, this shouldn’t be happening. Can you provide some environment information, like database (version), application server, java version etc?

Best,
Nikola

Hi @nikola.koevski

sure. We use camunda 7.8.0 in a Spring Boot application, running with Java 8, and Camunda is connected to PostgreSQL hosted on AWS RDS.

You say that OptimisticLockingException should get handled by the engine and it should not even appear in the logs?

I believe the root of our problem at the moment is now how to implement the workflow to parallely receive the external notifications (you helped a lot on this already, so it’s fine now), but rather to be able to count the shipments we receive and do it in a parallel way.

Can we somehow use the Camunda transaction management facilities to make the parallel counting working properly? Our current implementation when bombed with 100 parallel requests, and incrementing a counter for every request, ends up with a counter value of about 38 at the end. How shall we handle this properly?

thanks much,
Kristof

Hi @kjozsa,

Yes, the OptimisticLockingException is thrown when two (or more) concurrent transactions try to update the same entity. Since this is expected behavior, the exception is caught and the second operation is retried.

This may happen multiple times if there’s a larger number of concurrent transactions. In these cases, it is possible that the maximum number of retries is exceeded, and then an incident is raised that needs to be handled by an operator.

In your case, the exception is thrown because when two consecutive message correlations arrive, the first one tries to delete the EventSubscription entity since it has finished its operation. However, the second message correlation already updated the EventSubscription entity, so the first one has stale data and needs to be retried.

It is possible that executing 100 correlations in parallel on a single Message Start Event is not possible.

One alternative is to create an exclusive correlation. That way message correlation threads will acquire exclusive locks on the EventSubscription entity and get the correlations working properly:

MessageCorrelationBuilderImpl correlationBuilder = new MessageCorrelationBuilderImpl(commandContext, messageName);
        correlationBuilder.processInstanceId(processInstanceId);
        correlationBuilder.correlateExclusively(); 

Another thing that you should check is that the transaction isolation level on your PostgreSQL database is READ_COMMITTED, since this may lead to incorrect throws of OptimisticLockingException.

Best,
Nikola

Hi,

I’m having an issue that is quite similar to the one described in this thread.
The difference is that instead of correlating messages to an already-active process instance,
I want an instance to be created upon the first correlation. Consider a process like the following:

Just like in @kjozsa’s case, the messages arrive concurrently and there’s no straightforward
way of synchronization outside of the process application. correlateExclusively won’t work here either, as there won’t be an EventSubscription Entity when no process instance exists yet.

Hints of any kind are appreciated.

@nscuro we could never create a setup which have handled a loadtest correctly without engine errors, exceptions or 100% correct end results. Please tell us if you did, until that time, we’re considering this a very serious camunda limitation.

1 Like

The only workaround (that reliably works) I came up with until now, is the following:

  • Persist incoming messages
  • Regularly query all received messages, checking if they’re complete
    • Either by using Custom Batch or a technical support process in BPMN
  • Aggregate related messages in a collection and use that to start a process that processes these messages
  • Delete persisted messages

So basically, moving from real-time processing to batch processing. Sadly this introduces unnecessary delays, but as far as I can see, there are no real alternatives.

Did you come up with another workaround? I’m actually surprised that use cases like those described in this thread are not more common, I would’ve thought that there are at least best practices for them.