BPEL et les files JMS, première partie : Oracle Advanced Queuing

Afin de sécuriser l’échange des messages au sein de votre SI en garantissant la bonne livraison des messages et en découplant vos applications les unes des autres, l’introduction des files JMS semble être la solution à envisager !
En effet, cette API Java va permettre aux applications d’échanger des messages de manière asynchrone et de garantir la bonne livraison grâce à des notions de persistance : tant que le message ne peut être délivré, il est stocké (en mémoire ou base de données ou fichier).
Au travers de cette petite série d’articles autour de BPEL et JMS, je me propose de vous faire découvrir les possibilités offertes par BPEL en termes d’interaction avec des files JMS, plus particulièrement l’implémentation Oracle des files JMS : OJMS (Oracle Java Messaging Service) basée sur Oracle AQ (Advanced Queuing).
Dans cette première partie, nous allons nous concentrer sur la partie JMS et son implémentation par Oracle. Je me propose de commencer par quelques rappels sur les concepts JMS et de me focaliser sur la mise en place de files JMS avec Oracle AQ.

Java Messaging Service

Comme on a pu le voir plus haut, il s’agit d’une API Java permettant de découpler les applications en gérant les échanges de messages entre elles.

Messages JMS

Les messages JMS sont composés d’un en-tête (message technique) et d’un contenu (message fonctionnel).
Le contenu fonctionnel peut-être de différents types :

  • MapMessage
  • TextMessage
  • BytesMessage
  • Etc.

Pour ma part, j’utilise le type TextMessages sous un format XML pour manipuler des messages sous un format standard compréhensible par le plus grand nombre d’applications.

Point à point

Dans ce mode de consommation, une application produit un message destiné à un unique consommateur, on travaille alors avec le concept de Queue.

Publish / Subscribe

Dans ce mode de consommation, une application va produire un message qui devra être diffusé auprès de plusieurs applications, on travaille avec le concept de Topic. On gère alors la notion d’abonnement puisque les messages seront diffusés aux applications abonnées au topic.

Les applications consommatrices, pour des raisons quelconques, peuvent être indisponibles lors de la diffusion d’un message. JMS, au travers de la notion de consommateur durable, garantit que ce message lui sera bien délivré une fois qu’elle sera à nouveau disponible.
Par ailleurs, un consommateur a la possibilité de sélectionner les messages qu’ils souhaitent consommer dans le topic grâce à la notion de « Selector » offertes par JMS. Il est en effet possible pour un consommateur de filtrer les messages en fonction de la valeur des paramètres de l’en-tête du message.

Oracle Java Messaging Service

Cette implémentation des files JMS par Oracle est en fait l’interface JMS d’Oracle AQ. Cette implémentation reprend l’ensemble des fonctionnalités offertes par JMS et propose quelques possibilités supplémentaires comme la transformation des messages d’un format à un autre (extension JMS offerte par AQ).

Advanced Queuing

AQ est une solution de stockage de messages basée sur la base de données Oracle, profitant ainsi de tous les avantages de la base Oracle en termes de performances, sécurité, haute performance, etc. L’accès aux files de messages peut se faire comme on l’a vu via l’interface OJMS mais également par :

  • PL / SQL
  • Visual Basic
  • Java
  • Etc.

L’administration et la mise en place d’Oracle AQ peut être faite au travers de l’interface graphique « Oracle Enterprise Manager » ou avec SqlDeveloper en utilisant l’interface PL/SQL offerte par Oracle AQ.
Je me propose ci-dessous de vous donner les notions de base permettant de mettre en place Oracle AQ pour l’échange de messages entre processus BPEL et pour l’administration des queues JMS.

Mise en place d’AQ

Ci-dessous, les différentes commandes utiles pour la mise en place d’Oracle AQ.

Création d’un administrateur AQ

L’administrateur sera le user utilisé pour la création des queues JMS, leur démarrage, leur arrêt, la purge des messages, etc.

CONNECT SYS AS SYSDBA
CREATE USER AQADM IDENTIFIED BY AQADM DEFAULT TABLESPACE myDefaultTablespace TEMPORARY TABLESPACE myTempTablespace;
GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE to AQADM;
GRANT EXECUTE ON SYS.DBMS_AQ TO AQADM;
GRANT EXECUTE ON SYS.DBMS_AQADM TO AQADM;

Création d’un utilisateur AQ

Un autre user est créé pour la production et la consommation des messages, c’est cet utilisateur qui sera utilisé par BPEL :

CONNECT SYS AS SYSDBA
CREATE USER AQ_USER IDENTIFIED BY AQ_USER DEFAULT TABLESPACE myDefaultTablespace TEMPORARY TABLESPACE myTempTablespace;
GRANT CONNECT, RESOURCE, AQ_USER_ROLE to AQ_USER;
GRANT EXECUTE ON SYS.DBMS_AQ TO AQ_USER;
GRANT EXECUTE ON SYS.DBMS_AQIN TO AQ_USER;
GRANT EXECUTE ON SYS.DBMS_AQJMS TO AQ_USER;
BEGIN
  dbms_aqadm.grant_system_privilege(privilege=>'DEQUEUE_ANY',grantee=> AQ_USER, admin_option=>FALSE);
  dbms_aqadm.grant_system_privilege(privilege=>'ENQUEUE_ANY',grantee=> AQ_USER, admin_option=>FALSE);
END;

Création d’une table

Une queue JMS stocke ses messages dans une table qui lui est rattachée, ci-dessous les commandes à exécuter pour créer une table :

PROCEDURE DBMS_AQADM.CREATE_QUEUE_TABLE
   (queue_table IN VARCHAR2,
    queue_payload_type IN VARCHAR2,
    storage_clause IN VARCHAR2 DEFAULT NULL,
    sort_list IN VARCHAR2 DEFAULT NULL,
    multiple_consumers IN BOOLEAN DEFAULT FALSE,
    message_grouping IN BINARY_INTEGER DEFAULT NONE,
    comment IN VARCHAR2 DEFAULT NULL,
    auto_commit IN BOOLEAN DEFAULT TRUE);

Les principales options pour commencer sont :

  • queue_table : le nom de la table
  • queue_payload_type : type des messages stockés dans la table
  • multiple_consumers : TRUE ou FALSE s’il s’agit d’un topic ou d’une queue
  • etc.

A la création d’une table, certains objets sont implicitement créés, notamment : aq$_queue_table_name_e. Cette queue est la queue d’exception dans laquelle les messages en erreurs seront stockés.
Exemple :

begin
  dbms_aqadm.create_queue_table(
     queue_table => ' MyQueue_table',
     queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
     multiple_consumers => FALSE);
end;

Création d’une queue JMS

A une table, il est possible de rattacher plusieurs queues JMS. Ci-dessous, les commandes à exécuter pour créer une queue :

PROCEDURE DBMS_AQADM.CREATE_QUEUE
   (queue_name IN VARCHAR2,
    queue_table IN VARCHAR2,
    queue_type IN BINARY_INTEGER default DBMS_AQADM.NORMAL_QUEUE,
    max_retries IN NUMBER default 0,
    retry_delay IN NUMBER default 0,
    retention_time IN NUMBER default 0,
    dependency_tracking IN BOOLEAN default FALSE,
    comment IN VARCHAR2 default NULL,
    auto_commit IN BOOLEAN default TRUE);

Les principales options pour commencer sont :

  • queue_name : nom de la queue
  • queue_table : nom de la table dans laquelle les messages sont stockés
  • queue_type : type de queue (queue normale ou queue d’erreur)
  • max_retries : nombre de tentatives de consommation avant que le message soit déplacé dans la queue d’exception
  • retry_delay : intervalle entre deux essais (en secondes)
  • etc.

Exemple :

begin
  dbms_aqadm.create_queue(
    queue_name => ' MyQueue ',
    queue_table => ' MyQueue _table',
    queue_type => dbms_aqadm.NORMAL_QUEUE,
    max_retries => 5,
    retry_delay => 5);
end;

Modification d’une queue JMS

Une fois la queue créée, on veut pouvoir être capable de modifier certains paramètres pour certaines raisons :

PROCEDURE DBMS_AQADM.ALTER_QUEUE
   (queue_name IN VARCHAR2,
    max_retries IN NUMBER default NULL,
    retry_delay IN NUMBER default NULL,
    retention_time IN NUMBER default NULL,
    auto_commit IN BOOLEAN default TRUE);

Seulement certains paramètres peuvent être modifiés : max_retries, retry_delay, etc.
Exemple :

begin
  dbms_aqadm.alter_queue(
    queue_name=>'MyQueue’,
    retry_delay=> 30,
    max_retries => 1);
end;

Ajout /Suppression un consommateur à une queue

Pour les files JMS de type TOPIC, les consommateurs doivent préalablement s’abonner au topic afin de pouvoir consommer les messages :

Ajout :
PROCEDURE DBMS_AQADM.ADD_SUBSCRIBER(
    queue_name IN VARCHAR2,
    subscriber IN SYS.AQ$_AGENT);
Suppression :
PROCEDURE DBMS_AQADM.REMOVE_SUBSCRIBER(
    queue_name IN VARCHAR2,
    subscriber IN SYS.AQ$_AGENT);

Les options suivantes doivent être précisées :

  • queue_name : nom de la queue
  • subscriber : objet représentant un consommateur

Exemple :

DECLARE subsc_t
    sys.aq$_agent;
    subsc_addr  VARCHAR2(1024) := 'QueueName';
BEGIN
    subsc_t := sys.aq$_agent('ConsumerName', subsc_addr, 0);
    dbms_aqadm.add_subscriber(subsc_addr , subsc_t);
END;

Démarrage / Arrêt d’une queue

Les procédures de démarrage et d’arrêt des queues permettent d’activer / désactiver les opérations de publication et / ou consommation des messages :

Démarrage :
PROCEDURE DBMS_AQADM.START_QUEUE
   (queue_name IN VARCHAR2,
    enqueue IN BOOLEAN DEFAULT TRUE,
    dequeue  IN BOOLEAN DEFAULT TRUE);
Arrêt :
PROCEDURE DBMS_AQADM.STOP_QUEUE
   (queue_name IN VARCHAR2,
    enqueue IN BOOLEAN DEFAULT TRUE,
    dequeue IN BOOLEAN DEFAULT TRUE,
    wait IN BOOLEAN DEFAULT TRUE);

Les options à spécifier sont les suivantes :

  • queue_name : nom de la queue
  • enqueue : action d’empiler les messages
  • dequeue : action de dépiler les messages
  • wait : lors de l’arrêt de la queue, permet d’attendre que les transactions en cours soient terminées avant de stopper la queue

Purge des messages d’une queue

La purge des messages dans une table se fait de la façon suivante :

DBMS_AQADM.PURGE_QUEUE_TABLE(
    queue_table        IN   VARCHAR2,
    purge_condition    IN   VARCHAR2,
    purge_options      IN   aq$_purge_options_t);
TYPE AQ$_PURGE_OPTIONS_T is RECORD (
    block           BOOLEAN       DEFAULT FALSE
    delivery_mode   PLS_INTEGER   DEFAULT PERSISTENT);

Les options sont les suivantes :

  • queue_table : nom de la table à purger
  • purge_condition : condition sur les messages à supprimer
  • purge_options : objet décrivant les options de la purge
  • block : Si la valeur est TRUE, alors les consommateurs et producteurs sont bloqués lors de la purge, le succès de la purge est ainsi garanti
  • delivery_mode : permet de spécifier les messages purgés (PERSISTENT, BUFFERED, etc.)

Exemple :

BEGIN
   DBMS_AQADM.PURGE_QUEUE_TABLE
   (queue_table     => 'MyQueueTable',
    purge_condition => 'qtview.queue = “MyQueue” ',
    block => FALSE);
END;

Autres commandes utiles

Ci-dessous, quelques autres procédures utiles :

  • dbms_aqadm.drop_queue : suppression d’une queue
  • dbms_aqadm.drop_queue_table : suppression d’une table
  • etc.

Conclusion

Cet article vous aura permis d’acquérir les notions de base autour d’Oralce AQ et les quelques commandes à connaître pour monter votre propre environnement de tests Oracle AQ. Dans l’article suivant, je détaillerai la configuration du serveur d’applications Weblogic pour faire interagir BPEL avec AQ via JMS.

3 réflexions sur “BPEL et les files JMS, première partie : Oracle Advanced Queuing”

  1. Bonjour Franck,
    Merci pour ton commentaire.
    Dans cette procédure, « grantee » peut désigner soit l’utilisateur, soit le rôle auquel le privilège est accordé.
    Les deux versions sont donc valides 😉

  2. salut je suis en train de faire ton tutorial sur Oracle 11g
    tout d’abord merci pour l’avoir fait 🙂
    dans le paragraphe « Création d’un utilisateur AQ, n »e faudrait-il pas écrire :
    BEGIN
    dbms_aqadm.grant_system_privilege(privilege=>’DEQUEUE_ANY’,grantee=> ‘AQ_USER_ROLE’, admin_option=>FALSE);
    dbms_aqadm.grant_system_privilege(privilege=>’ENQUEUE_ANY’,grantee=> ‘AQ_USER_ROLE’, admin_option=>FALSE);
    END;
    /
    (j’ai remplace AQ_USER par AQ_USER_ROLE)
    Cdt
    Franck

  3. Ping : Oracle Service Bus et Oracle Advanced Queuing, première partie « EASYTEAM LE BLOG

Les commentaires sont fermés.