Camunda with MQ


#1

Hi,
I have a scenario where I need to consume the messages published on Rabbit MQ and create the User Task instance based on the message. So I designed the BPMN like this.

Here is the code for receiver

public class DocSignalReceiver extends AbstractBpmnActivityBehavior { 

private static final String DOC_NAME = "documentName";  private final static String QUEUE_NAME = "TestMQ";

@Autowired
ProcessEngine engine;

public static final String EXECUTION_ID = "executionId";

public void execute(final ActivityExecution execution) throws Exception {

	/*
	 * Message will be places by external service
	 * here.
	 */

}

public void signal(ActivityExecution execution, String signalName, Object signalData) throws Exception {

	ConnectionFactory factory = new ConnectionFactory();
	factory.setHost("localhost");
	Connection connection = factory.newConnection();
	Channel channel = connection.createChannel();
	channel.queueDeclare(QUEUE_NAME, false, false, false, null);

	final Consumer consumer = new DefaultConsumer(channel) {
		  @Override
		  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		    String message = new String(body, "UTF-8");

		    System.out.println(" [x] Received '" + message + "'");
		    try {
		      createWorkItem(message);
		    } finally {
		      System.out.println(" [x] Done");
		    }
		  }
		};
		channel.basicConsume(QUEUE_NAME, false, consumer);

	leave(execution);
}

When I ran my test case I am getting the exception

public class TestAsynchronousServiceTask {

  @Rule
  public ProcessEngineRule processEngineRule = new ProcessEngineRule();
 
  @Test
  @Deployment(resources = { "testprocess.bpmn" })
  public void testServiceInvocationSuccessful() {

    final ProcessEngine processEngine = processEngineRule.getProcessEngine();
    final RuntimeService runtimeService = processEngineRule.getRuntimeService();
    final TaskService taskService = processEngineRule.getTaskService();
    // start the process instance
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testprocess", "");
Map<String, Object> callbackPayload = Collections.<String,Object>singletonMap("DocumentName",     "Test.pdf");
    Execution execution = processEngine.getRuntimeService()
           .createExecutionQuery().processInstanceId(processInstance.getId()).singleResult();
    processEngine.getRuntimeService().signal(execution.getId(),callbackPayload);
  }
}

The Exception is coming in the last line of the test case saying that the execution id does not exists.

Fundamental questions I do have

  1. Is my thought process is right or do I need to use send and receive activities?
  2. From the last line I and getting the execution id same as process id, Is that right?

Please help me out in solving the issue.

Regards,
Subbu


#2

As to the state of your delegate -

  • I’d make sure your task starts correctly and is in an anticipated state before moving forward in the test. For example, just print (println) the instanceId just after process start. Also, get the state (i.e. “completed”, etc).

Messages inbound on Rabbit MQ -

  • Why not start a process when the message arrives? For example, onMessage (or something similar - I prefer Apache Camel). This way you don’t have to worry about BPM directly interfacing with Rabbit to check/re-check message arrival.

#3

Hi @garysamuelson,

Thanks for the responce. I chose the second option. So now I have the listener listening the queue and initializing the process when there is a message. This is good for the current situation.
I have a basic question here. Apologies if this a foolish question.

Please check the below flow.

storedocumet will store the document on DB. Next steps are time consuming steps in the process. So I designed them using send and receive activities. Finally user reviews and completes the process.

As you said I can take out the first 2 steps in my process and can initialize the process from “NotifyActivity1”. But the problem is there is one more send activity which pauses the execution. Now the question is: how to resume the process?

All the examples (in junit test cases )uses signal method from the Runtime service Class to resume the process. But from the web application, how can I do that.
Reagrds,
Subbu


Iterate over multiple received messages
#4

Camunda can run in async. So, you don’t necessarily need to wait for a response from a “send” task.

Why not refactor the process model, move the human task to a specialized “item classification” model, so that message orchestration and human activities aren’t bound into the same process instance?

The reason for this is that your design requires a process instance per each new inbound “start process” event - meaning that 10 “storedocument” requests will instantiate 10 new process instances. And, these instances are each required to complete a set of two request/reply message exchanges.

Unless the “send/notify” message exchanges belong, as an intrinsic value-add piece of the process model, move them to either a specialized flow or place them within specialized message orchestration infrastructure (i.e. ESB, MOM, etc).

However, if it’s critically important to represent execution progress within one single instance, the above model should work - though don’t forget that Camunda doesn’t implement a “persistent subscription” approach to msg-event interaction. For example, if a message comes back to “NotifyActivity1” prior to BPMN token arrival, you’ll get an error.


#5

Hi @garysamuelson,
Thanks for the clarification. Will implements the thoughts and will update.

Regards,
Subrahmanyam