Python Integration with Camunda

I’m just starting out with using Camunda and I’m a Python developer. Any resources for integrating Camunda with external Python scripts?

Thanks!

4 Likes

Hi @disha_umarwani,

In general there are two approaches to use Python code in a process:

  1. External task workers that are implemented in Python and that run independently of the Camunda process. I’m not aware of a Python example, but this article describes the general principle very well and with a proper REST client you should get started in no time.
  2. Script tasks embedded into the process model that are run via Jython from within the Camunda process. You can edit those in Camunda modeler, just set python as the scription language. Furthermore, make sure the Jython scripting engine is in Camunda’s classpath.

I generally recommend approach 1, as it should be easier to use and gives you full flexibility on how you write and build your Python code. Approach 2 could be suitable for rather simple scripts and logic.

Cheers,
Thorben

2 Likes

Hey Disha,
I recently worked on an external python worker. Here are my Code snippets. It might not be perfect but it is working and might help you.

import requests
import json
import time

#Define API Endpoint
url = 'http://localhost:8080/engine-rest/external-task/fetchAndLock'

# Define Json
task= {"workerId":"XXX",
  "maxTasks":2,
  "usePriority":"true",
  "topics":
      [{"topicName": "coolStuff",
      "lockDuration": 10000,
      "variables": ["orderId"]
      }]
}

# Connect to the API (Fetch and Lock)
try:
    res = requests.post(url, json=task)
    print(res.status_code)

#Get Body Text
    
    body = res.text    
    print(body)    
            
except: 
    print('Engine is down')


while body == '[]':
    res = requests.post(url, json=task)
    time.sleep(5)
    
    if body !='[]':
        break
        
#here goes your business logic!

# Connect to the API (Complete)
response = json.loads(body)
x = response[0]['id']


#convert into a string
x = str(x)

complete_url = 'http://localhost:8080/engine-rest/external-task/'+x+'/complete'



response= {
  "workerId": "XXX",
  "variables":
      {"aVariable": {"value": "aStringValue"},
      "anotherVariable": {"value": 42},
      "aThirdVariable": {"value": "true"}}
}

complete = requests.post(complete_url, json=response)
4 Likes

Hi Nele, thank you for your code. It works fine, but I have one issue. What I would expect that is the following:

  1. the program starts polling for something to happen (it does)
  2. it comes into action at the moment I start a process in Camunda which has the token of this worker.

But it’s the other way around: the worker only picks up the Camunda proces when I start the worker after I start the process win Camunda. This is the other way around than what I would expect.

Do you know what I could be doing wrong?

Here’s my code (actually your code):

import requests
import json
import time

# Define API Endpoint
url = 'http://localhost:8080/engine-rest/external-task/fetchAndLock'

# Define Json
task = {"workerId": "XXX",
        "maxTasks": 10,
        "usePriority": "true",
        "topics":
            [{"topicName": "loadData",
              "lockDuration": 10000,
              "variables": ["orderId"]
              }]
        }
body = []
# Connect to the API (Fetch and Lock)
try:
    res = requests.post(url, json=task)
    print('res.status_code', res.status_code)
    # Get Body Text
    body = res.text
    print('res.text', body)
except:
    print('Engine is down')

while body == '[]':
    print('wait')
    time.sleep(5)
    res = requests.post(url, json=task)
    print('result', res)
    if body != '[]':
        break

# here goes your business logic!
print("Connection succeeded")

# Connect to the API (Complete)
response = json.loads(body)
x = response[0]['id']

print('id', x)

# convert into a string
x = str(x)

complete_url = 'http://localhost:8080/engine-rest/external-task/' + x + '/complete'

response = {
    "workerId": "XXX",
    "variables":
        {"aVariable": {"value": "aStringValue"},
         "anotherVariable": {"value": 42},
         "aThirdVariable": {"value": "true"}}
}

print('response', response)

complete = requests.post(complete_url, json=response)

print('complete', complete)

Hi @Harm
There was a mistake in my snippet. I am sorry for that.

The variable body was not assigned again in the while loop. Therefore the body stays the way it is from the first rest call.

You need to change the while loop to the following:

while body =='[ ]':
   res = requests.post(url, json =task)
   body = res.text
   time.sleep(5)

    if body !='[ ]':
       break

That should do the trick.

Anyhow the worker will terminate as soon as it has fetched and locked and completed one Task.
So you might need to change it if you want the worker to start again after getting one task :slight_smile:

Cheers Nele

2 Likes

Hi Nele,

Thanks, I got it working already.

Regards,
Harm

Hi Disha,

This an example of a script that executes a python script and gives back the returnvalue to be evaluated in the proces. This way you can execute Python from a Camunda process step.

def pmdCommand = '/usr/local/bin/python3.7 /users/user/Documents/FTKDemo/DEXRS/DEXDNBNew/software/loadSourceDataWorkflow.py -t '+ Regulation + ' -f '+Frequency+ ' -d '+ ReportingDate.format('yyyy-MM-dd')

def sout = new StringBuffer()
def serr = new StringBuffer()

def process = pmdCommand.execute()
process.consumeProcessOutput(sout, serr)
process.waitForProcessOutput()
execution.setVariable("loadReturnValue", process.exitValue())
3 Likes

THanks for sharing some python examples. Its exactly what I am looking for right now!

1 Like

My pleasure!

Maybe just an update as I got a pn regarding the python snippet recently.

I put the snippets together in a python module which can be downloaded and the imported into your python code, so you can use the main functions to communicate with the Camunda Engine.

2 Likes

I like the fact that this topic is in the category Modeler.

1 Like

Hello,
You can use this

Regards,
Yogesh

2 Likes

Hi,

I used @yogeshrnaik great external task client, too.

If you need to integrate not only external task patterns but also for instance deployments, process definitions and so on, then the generic camunda client might be something for you. It is auto generated from openapi specification of Camunda, thus it looks a bit bulky, but it is reliable.

pip install generic-camunda-client

Documentation (also generated) is here. Code snippets from the documentation are very reliable.

Greetings,
Markus

2 Likes

Hi Yogesh! I decided to use your lib, but I ran into the lack of the ability to parallel listening to several topics, as in Java. Can you give any advice other than threading?

@vitalik
Each ExternalTaskWorker in my library listens forever (in while True) loop. So, if you want multiple workers listening parallel, you need to use threading.
Any particular reason, why you don’t want to use threads?

ExternalTaskWorker.subscribe() accepts one topic or a list of topic_names. So, you can use a single worker if you want to listen to multiple topics. But this worker’s subscribe method uses while True loop. So you need to run it in a separate thread. If you run it in main thread, it will be a blocking call.

Alternative to ExternalTaskWorker.subscribe() is that you can use ExternalTaskWorker.fetch_and_execute() method when you want to fetch and execute any particular External task. But, this is a one time call and will raise NoExternalTaskFound immediately if there are no matching external tasks. You then have to call this fetch_and_execute() in some loop (which subscribe() is doing already).

Another option to threads is to change my library to use async/await mechanism. But, for that you will have to rewrite all the network calls using async/await. I had done that in early versions of my library but I ran into some issues and hence removed that from library.

3 Likes

Hello Python Lovers,

I found another Python Camunda module, which looks pretty amazing. I haven’t tried it out yet, but still like to share it:
https://pycamunda.readthedocs.io/en/latest/index.html

Curious to see what you think :slight_smile:

Hi Yogesh, thank you for sharing your wonderful libraries!!.

" ExternalTaskWorker.subscribe() accepts one topic or a list of topic_names. So, you can use a single worker if you want to listen to multiple topics. But this worker’s subscribe method uses while True loop. So you need to run it in a separate thread. If you run it in main thread, it will be a blocking call."

Slightly confused about the above :frowning: - for multiple topics, how do I use the external task worker to map the topics to the distinct actions ?