Unable to read/write process variables in WebFlux's WebClient reactive chain

Hi,
we noticed a strange effect that occurs when trying to access the process variables inside a Mono chain (from project reactor). Our code is usually inside a JavaDelegate that is referenced in the model by a delegate expression.

Typical Code:

  @Override
  public void execute(DelegateExecution execution) {
     retrieveJoke().doOnNext(storeInProcessVariables(execution)).block();
  }

  private Mono<String> retrieveJoke() {
     //    if (true) return Mono.just("test");
     return builder.baseUrl("http://api.icndb.com/jokes/random")
                .build()
                .get()
                .retrieve()
                .bodyToMono(Joke.class)
                .map(Joke::getValue)
                .map(JokeValue::getJoke);
}

private Consumer<String> storeInProcessVariables(DelegateExecution execution) {
  return joke -> execution.setVariable("jokeText", joke);
}

However, this will produce an exception that I am not allowed to access the serializers outside of a command context

ENGINE-03041 Cannot work with serializers outside of command context.

However, this is not a problem with Project Reactor’s Mono chain (the commented out line in the retrieveJoke method works as expected). I suspect that Camunda cannot handle the parallel thread were the HTTP Request (and the doOnNext side effect method) gets executed.

If I add a .log() statement to the chain, my log output (with the threads) looks like this:

2019-11-14 23:18:07.256  INFO 2565 --- [nio-8080-exec-2] reactor.Mono.MapFuseable.1               : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2019-11-14 23:18:07.257  INFO 2565 --- [nio-8080-exec-2] reactor.Mono.MapFuseable.1               : | request(unbounded)
2019-11-14 23:18:07.695  INFO 2565 --- [or-http-epoll-1] reactor.Mono.MapFuseable.1               : | onNext(Chuck Norris doesn't throw up if he drinks too much. Chuck Norris throws down!)
2019-11-14 23:18:07.698  INFO 2565 --- [or-http-epoll-1] reactor.Mono.MapFuseable.1               : | cancel()
2019-11-14 23:18:07.699  INFO 2565 --- [or-http-epoll-1] reactor.Mono.MapFuseable.1               : | onComplete()
2019-11-14 23:18:07.700 ERROR 2565 --- [nio-8080-exec-2] org.camunda.bpm.engine.context           : ENGINE-16006 BPMN Stack Trace:
	ReadServiceTask (activity-execute, ProcessInstance[a25b9dd3-072c-11ea-a395-60f677b2052f])

the only valid workaround that I found is to call .block() on the Mono chain to immediately retrieve the result from the HTTP Request and then access the process variables. This is a little bit nasty because it violates idiomatic reactive programming and is hard to test without some rather problematic integration tests.

Is there a better way doing that? I tried to specify a Scheduler with the .publishOn(Scheduler) method from the Mono chain, but I can’t figure out how to get an appropriate Thread from the JobExecutor of Camunda.

I uploaded the complete demo project to my github repo:

1 Like

Hi @enolive,

A DelegateExecution object is only useful within the thread and callback that you receive it in. It is not useful in other threads, because certain internal logic is bound to a thread (e.g. transaction management, entity caching, etc.). It is not useful outside of a callback, because it is mutable (e.g. when process execution continues, the DelegateExecutions properties will change).

I’m not experienced with reactive programming in general or WebFlux in particular, but one idea is to set the variables via RuntimeService API, which will then use its own transaction, e.g.:

private Consumer<String> storeInProcessVariables(DelegateExecution execution) {
  RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
  String id = execution.getId();

  return joke -> runtimeService.setVariable(id, "jokeText", joke);
}

Note: Since the callback comes from a different thread, there are various race conditions to consider:

  • The process instance may have finished already.
  • The transaction that has created the execution (and triggered the delegate) may not yet have committed.
  • And maybe more :slight_smile:

So this needs to be designed carefully.

Cheers,
Thorben

Hey Thorben,
thanks for your reply and helping me understand the inner working of Camunda a little bit better!
Unfortunately, your suggested solution does not work.

While trying to get the runtimeService inside the doOnNext method I get an error

ENGINE-16004 Exception while closing command context: null

So I thought, why not trying to autowire the runtimeService at the construction and just call it in the doOnNext method?

    return joke -> runtimeService.setVariable(id, "jokeText", joke);

However, this will produce an error that the execution does not exist:

execution 4c6c4baf-07fc-11ea-9ce8-60f677b2052f doesn’t exist: execution is null

PS: the block() statement will ensure that the execution won’t end until the reactive call chain is finished with processing. So I believe the execution (and its underlying process instance) should still exist.

It’s probably the other case then, the execution isn’t persisted yet. You can understand the situation by putting a breakpoint into the delegate and then check in the database what is there.

Cheers,
Thorben

Thanks for the hint!

I assume that I have to make a service task run async in order to persist anything, right?
By doing that, the process will start working, but

  1. only when using runtimeService.setVariable(…) instead of execution.setVariable(…)
  2. with some weird stuff going on with camunda’s transaction handling

org.camunda.bpm.engine.OptimisticLockingException: ENGINE-03005 Execution of ‘UPDATE ExecutionEntity[7fb863cf-0b24-11ea-8f73-60f677b2052f]’ failed. Entity was updated by another transaction concurrently.

For now, this is beyond my understanding of project reactor’s vs camunda’s threading model. I assume that both camunda execution and the http thread that executes onNext are trying to write at the same time.

  @Override
  public void execute(DelegateExecution execution) {
    final var jokeText = retrieveJoke().block();
    execution.setVariable("jokeText", jokeText);
  }

this blocking call always works (regardless of sync/async or the used setVariable method) and might be better than fiddling around with schedulers (which should be possible with a specialized ExecutorService, if I understand Project Reactor’s documentation on the publishOn method correctly: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#publishOn-reactor.core.scheduler.Scheduler-)

btw, I’ve updated my project to use WebFlux where possible (REST API et al) and blocking calls inside the Delegate

For now, I am quite happy with the result…