Kafka Event Orchestration


#1

Hi all,

I’m pretty new to Camunda and I’m trying to implement something I think should be straight forward, but I’m struggling.

I want to use CamundaBPM as a simple event orchestrator - it should receive 2 events from Kafka, do something (like aggregate the payload of both messages), and then publish a new event to Kafka.

In my mind it would look something like the following:

In my Spring application I have 2 KafkaListeners, which trigger the start of a process when a message is received, with a parallel gateway at the other end:

    @KafkaListener(
            topics = "#{@kafkaApplicationProperties.getConsumerTopicById('camunda-event-01').getTopic()}",
            groupId = "#{@kafkaApplicationProperties.getConsumerTopicById('camunda-event-01').getGroupId()}"
    )
    public void processMessage(ConsumerRecord<String, String> consumerRecord) {
        super.processMessage(consumerRecord, String.class);
        runtimeService.startProcessInstanceByMessage("Message_Topic_1_Received");
    }

Based on my understanding, this won’t work because I am effectively starting 2 separate process instances and the gateway will never complete.

How should I trigger the messages under the same process instance so that the gateway will proceed to publish the event to Kafka?

Also how should I correctly correlate the Kafka messages together based on a trace/correlation ID?

If my sequence is entirely wrong, what would be the best way to implement this, and are there any samples out there that align with my use case?

Appreciate the guidance.
David


#2

if you use the correlation id as a business key when sending the message to camunda the following model would work.

It would start a new process in cases where an instance doesnt exist and continue an existing one in cases were the process exists with the same business key


#3

Thank you @Niall.

So in your suggested flow, if I understand correctly, we are creating 2 distinct process instances, depending on which message is received first through kafka.

If we take the top flow and assume that we have received ‘message one’ from a listener first:

image

The kafka listener will start the process instance by calling the following (and we can provide a trace ID as the business key as you suggested):

runtimeService.startProcessInstanceByMessage("Message_Topic_1_Received", tracer.currentSpan().context().traceIdString());

Following this, another kafka listener gets message 2 - it also calls something similar to start the process flow, because it doesn’t know if it should start a new process instance, or continue an existing process instance:

runtimeService.startProcessInstanceByMessage("Message_Topic_2_Received", tracer.currentSpan().context().traceIdString());

Programatically, how should I handle this to ensure it either continues with the existing flow, or starts a new process instance? How do I correlate the business keys together?

Thanks.


#4

@Niall - I’m continuing to experiment with this.

I made the following updates, which does work sporadically (or for the first time when I send in 2 messages on occasion), so something still isn’t quite right:

    @KafkaListener(
            topics = "#{@kafkaApplicationProperties.getConsumerTopicById('camunda-event-01').getTopic()}",
            groupId = "#{@kafkaApplicationProperties.getConsumerTopicById('camunda-event-01').getGroupId()}"
    )
    public void processMessage(ConsumerRecord<String, String> consumerRecord) {
        super.processMessage(consumerRecord, String.class);

        runtimeService.createMessageCorrelation("Message_Topic_1_Received")
                .processInstanceBusinessKey(tracer.currentSpan().context().traceIdString())
                .setVariable("tracer_id", tracer.currentSpan().context().traceIdString())
                .correlateWithResult();
    }
    @KafkaListener(
            topics = "#{@kafkaApplicationProperties.getConsumerTopicById('camunda-event-02').getTopic()}",
            groupId = "#{@kafkaApplicationProperties.getConsumerTopicById('camunda-event-02').getGroupId()}"
    )
    public void processMessage(ConsumerRecord<String, String> consumerRecord) {
        super.processMessage(consumerRecord, String.class);

        runtimeService.createMessageCorrelation("Message_Topic_2_Received")
                .processInstanceBusinessKey(tracer.currentSpan().context().traceIdString())
                .setVariable("tracer_id", tracer.currentSpan().context().traceIdString())
                .correlateWithResult();
    }

I can see the tracer is unique for each pair of messages submitted, but as I say it doesn’t always get to my delegate expression.

My bpmn:

orchestrate.bpmn (6.0 KB)

Any pointers on what I’m doing wrong?