Kafka + input\output params

Hi!
There is the following problem: it is necessary to listen to the data from the kafka topic and pass the results to the next task. Please tell me how to do it. I thought to make a Kafka listener using spring boot, then create a message sending to messageEvent and from there pass data to the next task, but there are problems with passing parameters to the next step of the process + I need to check the data that comes in, and for this I need process variables that i can only get from JavaDelegate but at the moment of getting data from kafka i don’t have access to processId

Hi @Klara

What is the code you’re using to send the message?
Usually you shouldn’t need a processId to send the message so perhaps there could be a way to change the correlation params to make it easier to find the process you’re looking for

(Also can you upload the process you’ve built)

diagram_test3.bpmn (12.3 KB)
Yes, I was thinking of getting a kafka message, and after it sending a message to the messageEvent. I need to compare the data that comes from kafka with the data that is stored in the process variables and if the condition is met, then start executing the second task.

Alrighty, so is there some kind of unique variable that exists in the process that you get from the Kafka event? If so you can do something like this:

processEngineServices.getRuntimeService().createMessageCorrelation("testExecutionState")
                .processInstanceVariableEquals("someKey", "KeyValue")
                .correlate();

Great, I’ll try. Tell me, please, can I somehow transfer the data that will come in the body of the message to the messageEvent, as an input params for the next task?

Luckily thats fairly easy, you just need to add an additional param to the method

        Map<String, Object> vars = new HashMap<String, Object>();
        vars.put("payloadData", "DataValue");
        
        processEngineServices.getRuntimeService().createMessageCorrelation("testExecutionState")
                .processInstanceVariableEquals("someKey", "KeyValue")
                .setVariables(vars)
                .correlate();

Yes, but I mean a little different :slight_smile: As far as I understand, the variables passed in the message will be local variables in the current task, not process variables. How can I access them in the next task when the current task with the message has finished executing and next service task will start executing?

Not in this case actually, the variable will be global to the process instance and so the next task should be able to access it without any problems.

Awesome, thank you very much. I will try to do everything with your advice and I hope there will be no difficulties:)

1 Like

Feel free to come back to let me know how it goes, successful or not :slight_smile:

@Niall Hi again! There was a following problem. I changed the diagram a little, the problem now is that I have actions in sequence: 1) running an api test from a task 2) listening to a kafka response that the process was completed correctly 3) running a second api test from a task 4) listening to a kafka response that the process is done 5) end of process
Both message events will listen to the same topic, but the process immediately listens to the last message event and skips the steps. How can you make consistent listening to topics?diagram_test4.bpmn (12.0 KB)