How to implement saga pattern with Kafka and Camunda BPM

Hi
I’am a new user of Camunda BPM, I want to use it for micro service orchestration but I have questions regarding the implementation and I cannot find an example for it:
Here is an overview of my architecture:

My WorkFlow is as follows:
Beginning of Saga
1- The orchestration sends a JSON to micro service A via KAFKA which does a processing and returns a Response A
2- The orchestration sends a JSON to micro service B via KAFKA which does a processing and returns a Response B
3- The orchestration sends some informations of response A + response B to micro-service C via KAFKA which processes and sends back a response
End of Saga.
At the moment I am able to write in Kafka, in the Adapter class but I don’t know where to listen to the responses.
How can i do to get the answers from the two tasks (response A + response B) and send them to the last task (Micro service C).
What is the best way to move objects through the current process?
Hope my problem is clear and thanks for the answers

Have you taken a lot at this blog post that @BerndRuecker wrote about this?

Thanks Niall for the link

Based on the example of the link, I tried to produce a workflow with Modeler for the process described in my first post and I would like to know if with this process, the Micro service “C” will only be executed if both services have finished their work?

Thanks for your response

Hi Adel.

The process looks almost perfect, you are just missing some parallel Gateways so that Service C really waits for A and B to be finished:

Did you find an answer to how to listen to the Kafka message? Examples can be found here: https://github.com/berndruecker/flowing-retail/blob/master/kafka/java/order-camunda/src/main/java/io/flowing/retail/order/messages/MessageListener.java

Best
Bernd

2 Likes

Hello Bernd

Thank you for your response,

When i try to run the code of the example that you suggest, I have this error ENGINE-02033 Delegate Expression ‘#{retrievePaymentAdapter}’ did neither resolve to an implementation of ‘interface org.camunda.bpm.engine.impl.pvm.delegate.ActivityBehavior’ nor ‘interface org.camunda.bpm.engine.delegate.JavaDelegate’.

Do you have any idea about this?

Thank’s

Best
ADEL

Not sure how you run exactly, but this expression should resolve to this Spring bean: https://github.com/berndruecker/flowing-retail/blob/master/kafka/java/order-camunda/src/main/java/io/flowing/retail/order/flow/RetrievePaymentAdapter.java#L13. Do you run the full Java Sping Boot App?
You somehow need to wire your Service Tasks to your own glue code - the above-mentioned example uses Spring for this, various possibilities are summarized here: https://camunda.com/best-practices/invoking-services-from-the-process/

Hi Bernd

I only run the order-camunda Sping Boot App, so i’ve added a controller where I deploy the order process

camunda.getRepositoryService().createDeployment()
.addClasspathResource("order.bpmn")
.deploy();

    camunda.getRuntimeService().startProcessInstanceByKey("order",someVariables); 

Is that the right way to start it ?

Thank’s for your help

@Adel16 retrievePaymentAdapter is a Java delegate.

Try configuring like below in the implementation type as Delegate Expression

${retrievePaymentAdapter}

Hi @aravindhrs

I took the code from github, so it is the same.

Do you mean that i only have to replace “#” by “$” before {retrievePaymentAdapter} ?

Thank’s for your response

$ or # does not make a difference, it should actually work. Do you start the example right from github without any changes? As it works on my machine and Tarvis that puzzles me. Do you start via Maven? Or?

Now it work, it was necessary to send the message type “OrderPlacedEvent” to kafka first, otherwise I got the error that i mentionned. I started it as a Spring boot app
Now I want to know if i should add a listener for each step of the process (Listener for “RetrievePaymentCommand”, “FetchGoodsCommand” and “ShipGoodsCommand” ),
or the process is supposed to go to the end with the current code?

If i should add a listener, how can i only continue the process inside these new listener methods, because the orderPlacedReceived method kick of a new flow instance.

Hope that it is clear for you

Thank’s for your reponse

Ok - that is wired, as the error message should have been different. But good to know it works now!

The listeners are all started, but you need to have the other microservices running that do carry out the actual workaround payment and such. These services consume the command and will produce an event that this work is done, which in turn will make the order fulfillment go ahead.

You can see what services you need in the docker compose file: https://github.com/berndruecker/flowing-retail/blob/master/runner/docker-compose/docker-compose-kafka-java-order-camunda.yml

Best
Bernd

1 Like

Thank you Bernd all things works well now :slight_smile:

1 Like