Message Buffer Plugin

Hey

I have been working on a message buffer plugin. Wanted to check with the community while “tightening the screws” to see if functions are covered:

The plugin provides a message buffer for correlating messages. Most common use case is when you have race conditions where the message sender sends the message before camunda is ready to receive the message. If the message message correlation returns no results or not the expected number of results, then will try again with the following features:

  1. Configurable Exponential Backoff with optional max duration at which point its state is changed for intervention
  2. Configurable Optional Expected matches in correlations (“are you expecting 3 correlations with this message?, if yes, then if only 2 correlations occurred, then it will attempt again until the 3 match is made”)
  3. Optional Long Poll to wait for a response: Similar to how you can message with results, but if your message sender requires a confirmation then it can wait for it (non-blocking reactive)
  4. Configs for Max Events in Buffer
  5. Pause Correlation attemps
  6. Manual Retry of correlation
  7. Full history of messages received, what correlation query was used, when each correlation attempt occurred, and what process instances / executions the message was correlated with. (configurable for max size)
  8. Metadata store for message sending details (what system sent the message, etc)
  9. Delegate code support so scripting and delegates can push messages into the buffer for correlation
  10. Permissions support: to set simple or complex permissions for who can send messages and to what instances/excutions, and with what variables, etc.
  11. Query API to see what messages are not correlated, which have completed, etc.
  12. Optional callback URL to notify the message sender that the message was correlated.

Any features that people have come across that they tend to need in these use cases?

5 Likes

This is a really great idea @StephenOTT - it certainly helps solve a very common problem and i’m going to be using it myself for sure.
Looking forward to seeing the final results.
I’ll try to give this project a signal boost at the release webinar

One of the debated features is if the message sender requires to receive a sync response with result data or just a “message accepted” response.
Would love to hear community use cases for this.

Hi Stephen,
I believe Zeebe uses a message buffer pattern. Perhaps consider mirroring its behaviour to facilitate migration to Zeebe.
Also consider how time to live (TTL) should be configured - client or buffer…

just a few eclectic thoughts…

regards

Rob

Thanks rob.
Ya I have TTL configurable through server and client submission.

You can also configure the back off settings, initial setup etc.

It can be setup to send to zeebe as well through a single endpoint.

There is also different TTL considerations: there is the TTL of the overall message existence. But there is the TTL equivalent that trump a controlled by the retries. I experimented with a pure queue/buffer pattern. But in practice it was much better to have the messages persisted as essentially “jobs to be completed”.

I am wondering if it could handle the use case of rate limiting a task that I describe at Trying to implement Rate limiting for a task but never really found an adequate solution for.

Great idea dale. Would the message go into the buffer to be delivered later or just prevent it from going into the buffer? (such as deny it at the rest APi level)

It would go into the buffer and all I then need is the ability to control how fast items can be removed from the buffer

Ah okay. Ya that would work. Instead of a immediate job creation. It can be added in maybe as “queued” or “paused” and have a fixed rate executor process those specific messages. I will add it to the list.

Hi @StephenOTT ,

I think this is a very useful extension. We implemented it several times for customers - mostly just to have a more stable solution, than original from Camunda throwing concurrent modification exceptions.

Are you building a completely new API?
I have an additional feature in mind, I would like propose then.

Anti Corruption Layer for message correlation
The “local” message correlation implies that you trust the caller, since it is located inside the same Java VM. This is not true anymore, if the caller is outside your VM and calls message correlate via REST. For this purpose, I want to have an Anti Corruption Layer between the incoming message and the change of variables inside the correlated process execution (it is even worth, since in the authorization is not really possible to look on message payload - you can either allow the entire message correlation via REST or allow it).

The idea is not to implement it now, but have a place to plug it in… Essentially, this would mean to have an Interface for ACL consisting of a condition(message, execution) and application(message, execution)… The default for those would deliver true for the condition and just write values of the message into execution…

What do you think?

Kind regards,

Simon

@zambrovski as part of the controls I was planning to add the ability to restrict what variables can be modified. Would that work?

If you need more capabilities, then can add in a ability to essentially provide a predicate or function that provides the permission logic to validate if the provided variables are allowed?

1 Like

I think permission is good. But having condition and application function would allow to configure on the client side how to process the values.

The ACL is not only authorization, but a transformation from request to local storage too.

For example one could change the internal representation of the variables without changing the message semantics and perform the transformation in the ACL…

Can I have a look on some code already?

By the way, the need for ACL results from the fact, that you have to commit the TX as soon as possible after the correlation. (A message receiving event / task should have a async after flag set). So the logics usually goes into the execution listener that verifies the modification… sometimes it is already too late. So the pattern we apply is to correlate setting local transient variables and pump them over to global variables if all checks are ok and do this in a post listener of catching event… it works but is somehow cluttering the business part of application with convention / special technical solution.

So I believe it should be a part of the platform instead…

@zambrovski is your transformation being provided by the BPMN designer? if so, would you just use Input/Output mappings? rather than build a additional transformation layer.

The code should be public later today. will post again when online.

The

Given this condition, you could add logic handlers on the message to create modifications before it is passed into the engine.

Internally it is a hazelcast distributed execution/job/task. So you can re-route into different executions to create other modifications as needed (if its runtime based), or route the supplied variables into another executor for other modifications.

Okay take a look at: https://github.com/StephenOTT/forms-manager

See: https://github.com/StephenOTT/forms-manager/tree/master/src/main/kotlin/formsmanager/camunda/messagebuffer

and https://github.com/StephenOTT/forms-manager/blob/master/src/main/kotlin/formsmanager/camunda/management/controller/CorrelateMessageController.kt

1 Like

Could this be a sulution: you disable the REST API (or at least the message correlation part of it) and create a dedicated services (REST or SOAP or …) for delivering messages to your application. Then you could implement whatever checks you like before you forward the message to the engine.

How’s the progress on this subject?
Just curious, since we built a small piece matching to it inside of camunda-bpm-data.
Watch the ACL feature in https://www.holunda.io/camunda-bpm-data/wiki/user-guide/features/

So any update on this @StephenOTT?

Hey. Okay sorry for the delay. Was stuck on other projects.

Okay so this is working now, just wrapping up some configs:

Is there a preference on what should happen when a message does not correlate after the max retries?

Would also be interested to hear if there is use cases for the buffer to be persisted as jobs or can just live in mem?