Programmation "Event-Based"

L’article précédent présente les limites de l’utilisation de DBMS_SCHEDULER avec des évènements :

  • Les messages ne peuvent pas être traités de manière séquentielle
  • Vous pouvez perdre les messages en cas d’un crash d’instance et ceci, bien que les files d’attentes soient persistentes 

Alors comment gérer proprement des évènements dans les bases de données Oracle ? Deux options s’offrent à vous : (1) développer un daemon ou (2) vous appuyer sur un message handler Streams. La seconde méthode est bien plus simple et c’est l’option que ce nouvel article illustre à travers un exemple.

Le script

Le script ci-dessous construit l’exemple voulu ; il :

  • crée une file d’attente de type ANYDATA
  • crée un type utilisateur qui contient les informations relatives aux évènements et qui sera encapsulé dans le type ANYDATA
  • crée une table pour journaliser les informations
  • crée un package pour envoyer et recevoir les messages
  • crée et démarre un process d’apply qui gérera les évènements

Le package, comme pour l’exemple de l’article précédent, attend 5 secondes entre le moment ou le message est déclenché et le moment où celui-ci est journalisé dans la table. Vous remarquerez également qu’il n’y a pas de commit dans le « message handler ». En effet celui-ci est géré directement par l’apply. Cela assure qu’aucun message ne sera perdu.

begin
dbms_streams_adm.set_up_queue(
queue_table => 'scott.myq_table',
queue_name => 'scott.myq');
END;
/

create or replace type scott.myq_payload
AS object (
id number
);
/

create table SCOTT.myq_log (
id number,
mytimestamp timestamp);

-- Create the package specification
create or replace package scott.myq_pkg is
procedure get_event (event_msg sys.anydata);
procedure send_event (id number);
end;
/

create or replace package body SCOTT.myq_pkg is
procedure get_event (event_msg sys.anydata)
as
v_payload myq_payload;
type_name varchar2(20);
num NUMBER;
begin
type_name := event_msg.GETTYPENAME;
IF type_name = 'SCOTT.MYQ_PAYLOAD' THEN
num := event_msg.getobject(v_payload);
dbms_lock.sleep(5);
insert into myq_log(id, mytimestamp)
values (v_payload.id, systimestamp);
end if;
end;

procedure send_event (id number)
as
v_anydata sys.anydata;
v_payload myq_payload;
l_enqueue_options DBMS_AQ.enqueue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
begin
l_message_properties.SENDER_ID := SYS.AQ$_AGENT(
name => 'LOCAL_AGENT',
address => NULL,
protocol => NULL);

v_payload:=myq_payload (id);
v_anydata:=anydata.convertobject(v_payload);

DBMS_AQ.enqueue(queue_name => 'SCOTT.myq',
enqueue_options => l_enqueue_options,
message_properties => l_message_properties,
payload => v_anydata,
msgid => l_message_handle);

commit;
end;

end;
/

begin
dbms_apply_adm.create_apply(
queue_name=>'scott.myq',
apply_name=>'myq_apply',
message_handler=>'scott.myq_pkg.get_event');
end;
/


begin
dbms_apply_adm.start_apply(
apply_name=>'myq_apply');
end;
/

Le test

Nous allons refaire le test de l’article précédent. Cette fois, vous constaterez qu’aucun message n’est perdu et que, tant que le parallelisme est à 1, tous les messages sont traités de manière successive :

begin
for i in 1..20 loop
scott.myq_pkg.send_event(i);
dbms_lock.sleep(0.5);
end loop;
end;
/
shutdown abort;

startup;

exec dbms_lock.sleep(100);

select count(id)
from scott.myq_log;

COUNT(ID)
---------
20

Pour terminer

Il n’y a pas grand chose à ajouter sinon que vous supprimerez encore les créés avec cet exemple…

drop package body scott.myq_pkg;
drop package scott.myq_pkg;

drop table scott.myq_log purge;

drop type scott.myq_payload;

begin
dbms_apply_adm.stop_apply(
apply_name=>'myq_apply');

dbms_apply_adm.drop_apply(
apply_name=>'myq_apply');
end;
/

begin
dbms_aqadm.stop_queue (
queue_name => 'SCOTT.myq');

dbms_aqadm.drop_queue (
queue_name => 'SCOTT.myq');

dbms_aqadm.drop_queue_table(
queue_table => 'SCOTT.myq_table');
END;
/