BPEL – Associate and reorder certain messages in a continuous data stream

Your need

You have a continuous stream of messages that arrive massively in your exchange flows, and you want some messages (answering certain criteria) can be pushed together so that to be sent grouped to a target application and/or in a specific order.
Your basic architecture is as follows:

Basic architecture
Basic architecture

Example

Imagine you have a steady stream of thousands banking transactions transiting on your SOA platform.
Among these transactions, you have:

  • Debits
  • Credits
  • Regulations (credit + debit), where:
    • Debit = -Credit
    • Credit is immediately followed by a debit
    • Credit contains the ID of the debit, and conversely.

Functional problems

For each regulation you wish to:

  • Send the credit and debit so close
  • Send them in the following order:
    1. credit
    2. debit
Sequence diagram
Sequence diagram

Technical solution

  1. Filter messages flagged « Regulation »
  2. Extract these messages from the main stream and put them in a lock retention
  3. Wait for the moment when both messages are arrived in the lock retention
  4. Push them again in the main stream, flagged as « associated », in order to not send them again in the retention

Your architecture must evolve as follow

Evolved architecture
Evolved architecture

BPEL implementation

You have to add three new elements:

  1. 2 JMS queues which are configured for the lock retention
    • One for credits treatment
    • One for debits treatment
  2. A composite (here called « Associator ») is added in order to:
    • Be consumer of the 2 queues
    • Re-orchestrating the injection of the 2 messages in the main queue in the order required.

On its side, the provider will inject messages in either 2 files as if it is a credit or a debit.
The key is to use the following JMS property: jca.jms.JMSCorrelationID:

Retention queues feeding
Retention queues feeding

Then the Associator will wait for JMS messages having the current jca.jms.JMSCorrelationID.
It receives these messages by consuming 2 JMS queues, as described in this process:
Associator process flow
Associator process flow

You can have as many instances of « Associators » waiting in parallel for others pairs of messages, none will be blocked by this method, because each instance of the consumer is waiting for its own CorrelationID.
So in summary, with this method you will not create a bottleneck on your whole SOA architecture.
Finally, do not forget to add a filter in the main stream (in the Provider) in order not to send again associated messages into the sandbox retention.
Otherwise it will loop indefinitely.
Filter messages with 'AssociatedRegulation' flag
Filter messages with ‘AssociatedRegulation’ flag