Iterating active executions/activities right after the process engine starts

Hello, this is a continuation of this.

I am using a postProcessEngineBuild method of a ProcessEnginePlugin to get the active executions and current activities waiting. Unfortunately I’m stuck having the process engine not fully initialized at the postProcessEngineBuild stage.

The question is, can you please advise how I can possibly overcome the issue below.

Here’s what I’m trying to do:

    @Override
    public void postProcessEngineBuild(ProcessEngine processEngine) {

        List<Execution> executions = processEngine.getRuntimeService().createExecutionQuery().list();
        try {
            for (Execution execution : executions) {
                ExecutionEntity exec = (ExecutionEntity) execution;

                ActivityImpl act = exec.getActivity();

This fails with NPE with stacktrace as following:

Caused by: java.lang.NullPointerException
at org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity.ensureProcessDefinitionInitialized(ExecutionEntity.java:759)
at org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity.getProcessDefinition(ExecutionEntity.java:741)
at org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity.ensureActivityInitialized(ExecutionEntity.java:823)
at org.camunda.bpm.engine.impl.persistence.entity.ExecutionEntity.getActivity(ExecutionEntity.java:811)
at com.sborex.crm.MessageSystem$BpmnParsePlugin.postProcessEngineBuild(MessageSystem.java:1228)
... 88 more

In ensureProcessDefinitionInitialized(ExecutionEntity.java:759) there is a statement:

  ProcessDefinitionEntity deployedProcessDefinition = Context.getProcessEngineConfiguration().getDeploymentCache()
      .findDeployedProcessDefinitionById(processDefinitionId);

and Context.getProcessEngineConfiguration() returns null =(((

What I am actually trying to do is to re-run the event listeners of the activities the execution currently located on, as they are starting some services, like this:

            for (Execution execution : executions) {
                ExecutionEntity exec = (ExecutionEntity) execution;

                ActivityImpl act = exec.getActivity();
                
                if(act!=null){
                    Map<String, List<DelegateListener<? extends BaseDelegateExecution>>> listeners = act.getListeners();
                    if(listeners!=null){
                        for (Map.Entry<String, List<DelegateListener<? extends BaseDelegateExecution>>> kv : listeners.entrySet()) {
                            for (DelegateListener<? extends BaseDelegateExecution> listener : kv.getValue()) {
                                if (listener instanceof ServiceStartListener) {

                                    ((ServiceStartListener) listener).notify(exec);

                                }
                            }
                        }
                    }
                }
            }

You are using internal API heavily here and in the end it does not work properly. What is the use case you are trying to solve? Maybe there’s a different way to solve it than this very error-prone way.

During a process execution, some elements’ (bpmn message catching event mostly) start event listeners may start some non-camunda services, like listen to a socket or similar. The process stops in a wait state. When the non-camunda message is received, the camunda message is sent, and the process goes on. This works (socket is closed at appropriate time etc etc, not asking for help on these matters).

But if the engine is stopped, and then restarted, I need to restart those non-camunda services. So my start event listeners implement my custom marker interface ServiceStartListener for me to distinguish which listener notify method is safe and needed to be re-run at startup.

I iterate over current executions - hoping this is legitimate action. Same is for retrieving a current wait state activity (the catching message event mostly). Would also hope that getting the listeners of an activity is also not that dirty hack - I added those listeners before in my listener plugin, why wouldn’t I be able to get them?

Now I check if this is actually my own custom listener, not any system one, and call it’s notify method.

Sorry for such a long read, hope it makes sense.

Hi, any ideas? I’ll be happy not to use the internal API to get the current activity listeners, or achieve my goals described above in any other legitimate way.

I would try the following:

  1. Encode the type of service that you start into the process model, e.g. as an extension element camunda:property.
  2. Decouple the code that starts the service from the execution listeners. Invoke it from the execution listeners with the property value read from the model.
  3. When you restart the server, query all process instances and call RuntimeService#getActivityInstance. For each (leaf) activity instance, check if the extension property is defined and re-run the start logic. No need to worry about invoking execution listeners then.

Cheers,
Thorben

Thanks, @thorben!

Unfortunately I can’t find out how do I get the extension property out of an ActivityInstance object?

The other problem of mine is that I already have a code that decides where to put a listener based on the type of event catching; for the boundary types and for those before event gateways it is put to the parent object rather then to a catching element itself (as discussed here). So looking to all leaf activity instances, if I am guessing right, is complicating and duplicating the logic I already have. I would if it were the last problem, but another one is…

I need to resolve expressions from the extension elements. Will I be able to do this? My notify method does that easily. Everything pushes me to just call it when I need to restart my services.

Secretly I’d wish to share some of it. This is a part of the listener plugin. We have to resolve several custom attributes, one is “uri”, and another is “recipient”:

       new ServiceStartListener() {
                @Override
                public void notify(ActivityExecution exec) throws Exception {
                    _LOG.debug("Intermediate Catch Message notifying START with uri {} in activity {}...", new Object[]{uriEl, exec.getId()});
                    final ELContext elcontext = _expressionManager.getElContext(exec);
                    ValueExpression ve = _expressionManager.createValueExpression(uriEl);
                    final String uri = (String) ve.getValue(elcontext);

                    final RecipientIdentifier recipient;
                    String el = element.attributeNS(REXNAMESPACE, "recipient");
                    if (el != null && el.trim().length() > 0) {
                        ve = _expressionManager.createValueExpression(el);
                        Object recObj = ve.getValue(elcontext);
           .................... etc etc
                        }
                    }

                    _startConsumerService(uri, messageName, exec.getId(), recipient);
                    _LOG.debug("Intermediate Catch Message notified START with uri {} in activity {}", new Object[]{uriEl, exec.getId()});
                }
            });

It seems hard for me to rip this out of a listener :cry:

Have I successfully defended my need to get listeners of current activities?

Otherwise I would like to ask more details on how do I get custom properties of process activities together with expression contexts.

Like I’ve said, I need evaluate some expression before starting my services. Seems like expression context and execution context (which throws my NPE) rely on thread locals. It seems impossible (hard) for me to reproduce my service restarting logic outside of normal process flow, especially if I decouple it from the listener.

@thorben, can there be some other way? For example, like replay the last step of the process as if it was moving normally, so that the listeners run by themselves in the correct context?..

For each (leaf) activity instance, check if the extension property is defined

Alriiiiite, how do I get anything like that form an ActivityInstance object?

Screenshot from 2018-03-14 18-07-29

resolved myself with serialization, thank you