Signal to service task on parallel path seems to signal all service tasks

I have a workflow with parallel concurrent paths to service tasks. When I signal one of those tasks it seems like all tasks are signaled. When I call

executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();

There is only one execution for the process instance itself and none for the service task. In addition the instance is still running and there is no way to signal the other service task and complete the workflow.

To illustrate the issue here is a workflow that has parallel service tasks:

<a class="attachment" href="/uploads/default/original/2X/0/05d22aaa211d29f260d7f2a5c053d8a733b4bfe2.xml">signalservicetaskconcurrentpaths.bpmn20.xml</a> (4.8 KB)

Here is the service task implementation:

package com.resilient.workflow.camunda;

import org.camunda.bpm.engine.impl.bpmn.behavior.TaskActivityBehavior;
import org.camunda.bpm.engine.impl.pvm.delegate.ActivityExecution;

public class ServiceTaskImpl extends TaskActivityBehavior {

@Override
public void execute(final ActivityExecution execution) {
}

@Override
public void signal(ActivityExecution execution, String signalName, Object data) throws Exception {

    leave(execution);
}
} 

And the test:

package com.resilient.workflow.camunda;

import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.ProcessEngineRule;
import org.junit.Rule;
import org.junit.Test;

import java.util.List;
import java.util.Objects;

public class SignalServiceTaskConcurrentPathsTest {

    @Rule
    public ProcessEngineRule processEngineRule = new ProcessEngineRule("camunda.cfg.xml");


    @Deployment(resources = {"signalservicetaskconcurrentpaths.bpmn20.xml"})
    @Test
    public void testSignalServiceTaskOnConcurrentPaths() throws InterruptedException {

        //Given a workflow with parallel service tasks
        ProcessInstance processInstance = processEngineRule.getRuntimeService().startProcessInstanceByKey("twoservicetasks");
        assertThat(processInstance.isEnded(), is(false));

        //when the workflow is run
        List<Execution> executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();
        //then there are three executions: one for each service task and one for the process instance
        assertThat(executions, hasSize(3));

        //when one of the service tasks is signalled
        ExecutionEntity task_01qgj19 = executions.stream().map(ExecutionEntity.class::cast)
            .filter(entity -> Objects.equals(entity.getActivityId(), "Task_01qgj19"))
            .findFirst().get();
        processEngineRule.getRuntimeService().signal(task_01qgj19.getId());

        //then there are two executions: one for the remaining service task and one for the process instance
        executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();
        assertThat(executions, hasSize(2));
    }
}

It is very likely that

leave()

is not the correct method to use in the service task implementation, so let me know if something else should be used there to progress the service task.

Any help would be appreciated.

Thanks,
Ben

Hi @bluri,
could you post you BPMN XML once again? I can’t see it as attachment.

UPD. Foun it. It is here: https://forum.camunda.io/uploads/default/original/2X/0/05d22aaa211d29f260d7f2a5c053d8a733b4bfe2.xml

Hi @bluri,

when you signal the first task, related execution goes straight to the “end” node. After this, one execution still left, related with second service task, and you can signal it. This works for me with your model:

@Test
@Deployment(resources = {“signalservicetaskconcurrentpaths.bpmn”})
public void threeServiceTasksAndAGateway() {

//Given a workflow with parallel service tasks
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("twoservicetasks");
assertFalse(processInstance.isEnded());

Execution executionEntity = runtimeService.createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).activityId("Task_01qgj19").singleResult();

runtimeService.signal(executionEntity.getId());

executionEntity = runtimeService.createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).activityId("Task_0i098fv").singleResult();
assertNotNull(executionEntity);
runtimeService.signal(executionEntity.getId());

}

If you want the execution to “wait” for the other task to be signaled, consider using parallel gateways:

Thank you for the reply. This makes perfect sense.

I wanted to be able to capture, via an event listener, when the process instance has ended, which in this case is immediately after the first signal. I registered an event listener for the ExecutionListener.EVENTNAME_END. However I don’t get the callback as expected after the signal.

Am I registering for the event correctly?

Thanks,
Ben

Here is a test that illustrates the problem:

    package com.resilient.workflow.camunda;

import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;

import com.google.common.collect.Lists;

import org.camunda.bpm.engine.ProcessEngineConfiguration;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.ExecutionListener;
import org.camunda.bpm.engine.impl.bpmn.parser.AbstractBpmnParseListener;
import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl;
import org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity;
import org.camunda.bpm.engine.impl.persistence.entity.ProcessDefinitionEntity;
import org.camunda.bpm.engine.impl.util.xml.Element;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
import org.camunda.bpm.engine.test.Deployment;
import org.camunda.bpm.engine.test.ProcessEngineRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;

import java.util.List;
import java.util.Objects;

public class SignalServiceTaskConcurrentPathsTest {

    public class CustomEngineRule extends ProcessEngineRule {

        public CustomEngineRule() {
            super(false /* don't assert if database is not cleaned up, just clean it up */);
        }

        @Override
        protected void initializeProcessEngine() {
            ProcessEngineConfigurationImpl processEngineConfiguration =
                ResilientStandaloneProcessEngineConfiguration.create()
                    .setJdbcUrl("jdbc:h2:mem:camunda;DB_CLOSE_DELAY=1000")
                    .setJdbcDriver("org.h2.Driver")
                    .setJdbcUsername("sa")
                    .setJdbcPassword("");
                    processEngineConfiguration.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
            processEngineConfiguration.setCustomPostBPMNParseListeners(Lists.newArrayList(new CustomListener()));
            super.processEngine = processEngineConfiguration.buildProcessEngine();
        }


        @Override
        public void finished(Description description) {
            super.finished(description);
        }
    }

    @Rule
    public ProcessEngineRule processEngineRule = new CustomEngineRule();

    public class CustomListener extends AbstractBpmnParseListener {

        @Override
        public void parseProcess(Element processElement, ProcessDefinitionEntity processDefinition) {
            processDefinition.addListener(ExecutionListener.EVENTNAME_END, new CustomEndListener());
        }
    }

    private boolean instancedEnded = false;

    public class CustomEndListener implements ExecutionListener {

        @Override
        public void notify(DelegateExecution execution) {
            instancedEnded = true;
        }
    }

    @Deployment(resources = {"signalservicetaskconcurrentpaths.bpmn20.xml"})
    @Test
    public void testSignalServiceTaskOnConcurrentPaths() throws InterruptedException {

        //Given a workflow with parallel service tasks
        ProcessInstance processInstance = processEngineRule.getRuntimeService().startProcessInstanceByKey("twoservicetasks");
        assertThat(processInstance.isEnded(), is(false));

        //when the workflow is run
        List<Execution> executions = processEngineRule.getRuntimeService().createExecutionQuery().processInstanceId(processInstance.getProcessInstanceId()).list();
        //then there are three executions: one for each service task and one for the process instance
        assertThat(executions, hasSize(3));

        //when one of the service tasks is signalled
        ExecutionEntity task_01qgj19 = executions.stream().map(ExecutionEntity.class::cast)
            .filter(entity -> Objects.equals(entity.getActivityId(), "Task_01qgj19"))
            .findFirst().get();
        processEngineRule.getRuntimeService().signal(task_01qgj19.getId());

        //then make sure we were notified that the instance ended.
        assertThat(instancedEnded, is(true));
    }
}

Never mind. I realize that it works if I signal the second activity.