Hi @Niall
If you observe clearly, I have two bpmns, one with just once single task, which is a service Class and it doesn not have any end text, inside this service class, i am creating a thread, which internally connects with Mqtt boker and listens to it, the thread keeps on running, meaning my service task of bpmn1 doesn’t end, I am able to correlate with the message event, and create process variable for the bpmn2 from this service class, however when i am trying to create process variable for the first process, it is throwing me error, execution id is null, The service class implementation is as below:
public void execute(DelegateExecution execution) throws Exception {
try {
RuntimeService runtimeService=execution.getProcessEngine().getRuntimeService();
MqttSubscriberServiceTask mqtt_camunda_service_task = new MqttSubscriberServiceTask();
org.camunda.bpm.model.bpmn.impl.instance.ServiceTaskImpl startEvent = (org.camunda.bpm.model.bpmn.impl.instance.ServiceTaskImpl) execution.getBpmnModelElementInstance();
ExtensionElements extensionElements = startEvent.getExtensionElements();
Collection<CamundaProperty> properties = extensionElements.getElementsQuery()
.filterByType(CamundaProperties.class)
.singleResult()
.getCamundaProperties();
String dmnName = null;
String mqttVariableName = null;
String mqttBrokerEndPoint = null;
for (CamundaProperty property : properties) {
String name = property.getCamundaName();
String value = property.getCamundaValue();
switch (name) {
case DMNNAME:
dmnName = value;
break;
case MQTTVARIABLENAME:
mqttVariableName = value;
break;
case MQTTBROKERENDPOINT:
mqttBrokerEndPoint = value;
break;
}
}
// MQTT Logic
// setting the process variables with the received Message
String mqttMessage = mqtt_camunda_service_task.subscribeWithBrokers(mqttBrokerEndPoint, execution, mqttVariableName, runtimeService);
if (mqttMessage == "-1") {
throw new RuntimeException(
"Error: MQTT Broker Connection failed - Invalid Broker IP Address :: Contact Administrator.");
} else if (mqttMessage == "-2") {
throw new RuntimeException("Error: Exception occured in Thread Processing :: Contact Administrator.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public String subscribeWithBrokers(String mqttBrokerEndPoint, DelegateExecution execution, String mqttVariableName, RuntimeService runtimeService) {
try {
// 1.Subscribe to the broker upon activation in a new Thread
// logger.info("Inside MqttSubscriberServiceTask :: Subscribing to the broker
// upon activation in a new Thread");
MqttSubscriber subscriber = new MqttSubscriber(syncObj, receivedMsg, mqttVariableName, mqttBrokerEndPoint, execution, runtimeService);
subscriber.subscribe();
} catch (Exception e) {
logger.error("Exception occured in subscribeWithBrokers:: " + e.getMessage());
return "-2";
}
return receivedMsg.toString();
}
This is my javadelegate class, which internally creates a thread and calls subscribe method of other class, inside my MqttSubscribe class, i am trying to correlate with the message start event, and also I am creating process variable for my BPMN1.
this.runtimeService.getVariable(this.delegateExecution.getId(), "LOL");
runtimeService.createMessageCorrelation("satyaRam").setVariable("MQTT", message.toString()).setVariable("rootId", delegateExecution.getId()).setVariable("tailingData", collection).setVariable("deltaRegion", deltaRegion).setVariable("setCondition", setCondition).setVariable("timeStamp", now).setVariable("previousValue", previousValue).correlate();`