Utiliser des files d'attente JMS sur AQ en PL/SQL

Pour des raisons évidentes, JMS a pris une part prépondérante dans de nombreuses architectures Java et Java EE. Par ailleurs, Oracle Advanced Queuing (AQ ou Streams AQ) est un support possible pour les messages accédés en JMS. AQ apporte des alternatives intéressantes en terme de gestion des transactions, de haute disponibilité, de supervision, de possibilité de montée en charge ou, dans certains contexte où Oracle est déjà acheté, de coût. Mais ce type de considération est stérile… Pourquoi utiliser AQ plutôt que Apache ActiveMQ, les fournisseurs JMS des serveurs d’applications Java EE comme Glassfish ou Weblogic ou les dinosaures comme (Websphere)MQ(Series) ?

De fait, Streams AQ constitue un pont idéal et transactionnel entre, d’un côté, le langage de référence des bases de données Oracle (le PL/SQL) et, via Java et JMS, l’immense majorité des plateformes modernes d’intégration : les Architectures Orientées Service (SOA), les bus de Services d’Entreprise (ESB) ou les Architectures Orientées Evènements (EDA). En mixant PL/SQL et JMS, on peut donc facilement intégrer les fonctionnalités avancées de la base Oracle à ces plateformes ou, tout simplement, aux serveurs d’applications Java EE.

Cet article illustre, avec 2 programmes simples, comment utiliser AQ/JMS pour échanger des messages entre PL/SQL, d’un côté, et, de l’autre, Java.

Notes :
La littérature à propos de AQ et JMS est assez rare ou, au moins, très mal écrite. C’est en particulier le cas de la documentation me semble-t-il. Si vous cherchez de l’information, installez le CD d’exemples de la base de données (6 sur 7 de la version 11.2.0.2 !). Regardez ensuite dans le répertoire $ORACLE_HOME/rdbms/demo. Un autre article très intéressant et bien écrit est celui-ci qui décrit l’intégration de AQ/JMS dans Glassfish via JCA

Introduction

JMS offre plusieurs typologies de messages et d’échanges :

  • Les « Queues » JMS permettent de faire du point à point. Un message est consommé une (dans le cas de files persistentes) et une seule fois.
  • Les « Topics » JMS permettent de faire du Publish/Subscribe avec plusieurs consommateurs pour un même message.

En outre, vous pouvez simplement, et à condition qu’il soit sérialisable, mettre un objet Java dans une file d’attente JMS. Si vous connaissez AQ, vous avez compris qu’il n’y a pas une correspondance évidente entre les concepts du MoM de la base de données et l’interface JMS. Je vous laisse étudier tout ces aspects.

Pour les besoins de l’exemple qui suit, j’utilise un message de type javax.jms.MapMessage dont la structure est directement accessible en PL/SQL et facile à manipuler dans les 2 langages. La démonstration met en oeuvre une approche point à point non intégré à Java EE.

Créer une files d’attentes JMS sur AQ

Pour commencer, il vous faudra créer utilisateur et files d’attente. Le script ci-dessous

  • crée un utilisateur demo 
  • lui donne les droit pour créer et gérer les files d’attentes JMS
  • crée une file d’attente JMS MY_QUEUE qui stocke des messages de type MapMessage
  • démarre la dite file d’attente

create user demo identified by demo;
grant dba, aq_administrator_role, aq_user_role to demo;
grant execute on dbms_aqadm to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqin to demo;
grant execute on dbms_jms_plsql to demo;
connect demo/demo

exec dbms_aqadm.create_queue_table(-
queue_table => 'demo.my_queue_table', -
queue_payload_type => 'SYS.AQ$_JMS_MAP_MESSAGE')

exec dbms_aqadm.create_queue(-
queue_name => 'demo.my_queue', -
queue_table => 'demo.my_queue_table')

exec dbms_aqadm.start_queue(-
queue_name => 'demo.my_queue')

Il existe une API Java pour effectuer la même opération ou, si vous préférez, vous pouvez également utiliser SQL*Developer comme ci-dessous :

Insérer un message MapMessage en PL/SQL

Insérer un message dans une file JMS depuis PL/SQL est identique à l’insertion d’un message dans une file AQ classique avec une PayLoad de votre choix. La seule chose qui change vraiment est le type du message et sa construction. Dans le cas d’un MapMessage, les méthodes set_XXX() permettent d’alimenter le message avec son contenu :

declare
id pls_integer;
agent sys.aq$_agent := sys.aq$_agent('', null, 0);
message sys.aq$_jms_map_message;
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid raw(16);
BEGIN

-- Consturct a empty map message object
message := sys.aq$_jms_map_message.construct;

-- Shows how to set the JMS header
message.set_replyto(agent);
message.set_userid('demo');

-- Shows how to set JMS user properties
message.set_string_property('myProp', 'Urgent');

id := message.clear_body(-1);

message.set_long(id, 'Prop1', 1);
message.set_string(id, 'Prop2', 'Greg!');
message.flush(id);
message.clean(id);

dbms_aq.enqueue(queue_name => 'demo.my_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => msgid);

END;
/

commit;

Si vous voulez insérer un message de type javax.jms.ObjectMessage depuis la base de données, vous devrez nécessairement créer un objet Java et donc utiliser JServer. Dans ce cas, il est possible de charger l’implémentation JMS de AQ dans la base de données (aqapi.jar) avec loadjava.

Recevoir le message en Java

Pour recevoir le message, il suffit d’écouter sur la file d’attente JMS avec l’implémentation AQ de JMS et d’utiliser l’interface JMS pour recevoir et traiter le message; voici un exemple JMSDemoReceive.java :

import java.sql.*;
import oracle.jdbc.*;
import oracle.AQ.*;
import javax.jms.*;
import oracle.jms.*;
import javax.transaction.Synchronization;

public class JMSDemoReceive
{
public static void main(String[] args) {
try {
dequeue("demo", "my_queue");
} catch (Exception e) {
e.printStackTrace(System.out);
}
}

public static void dequeue(String owner, String qname)
throws JMSException, SQLException, AQException
{
java.sql.Connection conn =
DriverManager.getConnection("jdbc:oracle:thin:@red:1521/BLACK",
"demo",
"demo");
QueueSession q_sess = null;
QueueConnection qc_conn = null;
Queue queue = null;
QueueReceiver receiver = null;
MapMessage msg = null;

/*
* Connect to the database and get the Queue Object
*/
qc_conn = AQjmsQueueConnectionFactory
.createQueueConnection(conn);
q_sess = qc_conn.createQueueSession(true,
Session.CLIENT_ACKNOWLE DGE);
qc_conn.start();
queue = ((AQjmsSession)q_sess)
.getQueue(owner, qname);
/*
* Use the JMS API to Receive the message
* And display its content. Do that for one-only MapMessage
*/
receiver = q_sess.createReceiver(queue);
msg = (MapMessage) receiver.receive(1000);
if (msg != null) {
System.out.println(
"Property 1: " + (new Long(msg.getLong("Prop1"))).toString());
System.out.println(
"Property 2: " + msg.getString("Prop2"));
}
q_sess.commit();
receiver.close();
}
}

Pour compiler et utiliser le programme, il suffit de positionner le CLASSPATH, d’utiliser javac et java :

export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar
export CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/jmscommon.jar
export CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/aqapi.jar
export CLASSPATH=$CLASSPATH:$ORACLE_HOME/rdbms/jlib/xdb.jar
export CLASSPATH=$CLASSPATH:$ORACLE_HOME/jlib/jta.jar
export CLASSPATH=$CLASSPATH:.

javac JMSDemoReceive.java
java JMSDemoReceive

Property 1: 1
Property 2: Greg!

Conclusion

Evidemment, il ne s’agit que d’un exemple rapide et vous pourrez facilement le transformer en un principe bien utile dans vos programmes. Si vous trouvez des articles intéressants et non référencé ici, n’hésitez pas à les partager en laissant un commentaire.