Oracle Event-Based Scheduler n'est pas Event-Based

Ca parait sévère dit comme ça… Le fait est que, si vous envisagez de gérer des évènements de manière asynchrone dans une base Oracle, il y a de grande chance que le scheduler ne soit pas l’outils pour ça. A moins que :

  • vous n’ayez pas peur de perdre des évènements
  • vous n’ayez pas trop d’évènements
  • vous n’avez pas besoin de gèrer les évènements en série

Je ne vais pas faire de mystère, le problème est décrit avec le paramètre parallel_instances la documentation ! Pour le contourner, vous préfèrerez utiliser un message handler.

Cet article illustre, à travers un exemple simple comment mettre en oeuvre le scheduler avec des évènements et les limites de ce type d’approche

Event-based Scheduler

Dans un premier temps, créez un exemple de job déclenché par un évènement dans le schéma SCOTT; pour cela :

  • donnez les droits de créer et d’utiliser job, files d’attentes et dbms_lock
  • créez une file d’attente myq dans le schéma
  • créez une table et un package déclenchés par le scheduler
  • créez un programme qui prenne en comme paramètre le message envoyé dans la file d’attente et lancez-le via un job

Voici le script qui réalise l’ensemble de ces opérations :

grant aq_administrator_role to SCOTT;
grant create job to SCOTT;
grant execute on dbms_aq to SCOTT;
grant execute on dbms_lock to SCOTT;

begin
-- Create a queue table to hold the event queue.
dbms_aqadm.create_queue_table(
queue_table => 'SCOTT.myqtable',
queue_payload_type => 'SCOTT.myq_payload',
multiple_consumers => TRUE);

-- Create the ETT event queue.
dbms_aqadm.create_queue (
queue_name => 'SCOTT.myq',
queue_table => 'SCOTT.myqtable');

-- Start the event queue.
dbms_aqadm.start_queue (queue_name => 'SCOTT.myq');
END;
/

create table SCOTT.myq_log (
id number,
mytimestamp timestamp);

-- Create the package specification
create or replace package SCOTT.myq_pkg is
procedure get_event (event_msg IN myq_payload);
procedure send_event (id number);
end;
/

create or replace package body SCOTT.myq_pkg is
procedure get_event (event_msg IN myq_payload)
as
begin
dbms_lock.sleep(5);
insert into myq_log(id, mytimestamp)
values (event_msg.id, systimestamp);
commit;
end;

procedure send_event (id number)
as
v_payload myq_payload;
l_enqueue_options DBMS_AQ.enqueue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
begin
v_payload:=myq_payload (id);

DBMS_AQ.enqueue(queue_name => 'SCOTT.myq',
enqueue_options => l_enqueue_options,
message_properties => l_message_properties,
payload => v_payload,
msgid => l_message_handle);

commit;
end;

end;
/

begin
dbms_scheduler.create_program (
program_name => 'SCOTT.myq_prog',
program_action => 'myq_pkg.get_event',
program_type => 'STORED_PROCEDURE',
number_of_arguments => 1,
enabled => FALSE) ;

dbms_scheduler.define_metadata_argument (
program_name => 'SCOTT.myq_prog',
argument_position => 1 ,
metadata_attribute => 'EVENT_MESSAGE') ;

dbms_scheduler.enable ('SCOTT.myq_prog');

dbms_scheduler.create_job (
job_name => 'SCOTT.myq_job' ,
program_name => 'SCOTT.myq_prog',
queue_spec => 'SCOTT.myq',
enabled => true,
start_date => SYSTIMESTAMP,
auto_drop => false) ;

end ;
/

col owner format a5
set lines 80
set tab off
col job_name format a7
col program_name format a10
col queue format a10
col state format a12
col run_count 99999
select owner, job_name, program_name,
event_queue_owner||'.'||event_queue_name queue,
state, run_count
from dba_scheduler_jobs
where owner='SCOTT';

OWNER JOB_NAM PROGRAM_NA QUEUE STATE RUN_COUNT
----- ------- ---------- ---------- ------------ ----------
SCOTT MYQ_JOB MYQ_PROG SCOTT.MYQ SCHEDULED 0

Des évènements non capturés

Pour commencer, notez que tous les évènements ne sont pas capturés. le programme déclenché par le scheduler fait une pause de 5 secondes pour illustrer ce point. Lancez le script ci-dessous qui génère 20 messages et regardez combien de messages ont été générés :

truncate table scott.myq_log;

begin
for i in 1..20 loop
scott.myq_pkg.send_event(i);
end loop;
dbms_lock.sleep(1);
end;
/

set tab off
col id format 9999
select id
from scott.myq_log;

ID
-----
1

Comme vous pouvez le voir, un seul message est généré comme indiqué dans la documentation.

Et pour capturer tous les évènements ?

l’attribut paralle_instances permet de contourner ce problème comme vous pouvez le voir ci-dessous :

begin
dbms_scheduler.set_attribute('SCOTT.myq_job',
'parallel_instances',
true);
end;
/

truncate table scott.myq_log;

begin
for i in 1..20 loop
scott.myq_pkg.send_event(i);
end loop;
end;
/

shutdown abort

select count(id) id
from scott.myq_log;

ID
----
20

Seulement voilà, les messages sont alors déclenchés en parallèle comme on peut s’en rendre compte en regardant le temps entre les messages :

select max(mytimestamp)-min(mytimestamp) 
from scott.myq_log;

MAX(MYTIMESTAMP)-MIN(MYTIMESTAMP)
--------------------------------
+000000000 00:00:01.101514

Il n’y a pas de paramètre pour assurer que les messages sont traités les uns à la suite des autres. Il n’y a pas non plus de seuil maximum de messages traités en simultané. Le programme ici ne fait rien mais si vous prenez une vague de 1000 messages qui « allument » la machine, vous n’y pourrez pas grand chose…

Vous perdrez quand même des messages

Et maintenant, le pire… Vous pourrez malgré tout perdre des messages. La raison est que le dequeue et le job ne sont pas gérés dans la même transaction. Voici comment reproduire ce cas :

truncate table scott.myq_log;

begin
for i in 1..20 loop
dbms_lock.sleep(0.5);
scott.myq_pkg.send_event(i);
end loop;
end;
/
shutdown abort

startup

select count(id) id
from scott.myq_log

ID
----
10

Supprimer l’exemple

Comme indiqué dans l’introduction, pour traiter ce cas proprement, vous aurez sans doute intérêt à utiliser Streams et un message handler. Pour terminer, supprimez tous les composants créés pour cet exemple :

begin
dbms_scheduler.drop_job (
job_name => 'SCOTT.myq_job');

dbms_scheduler.drop_program (
program_name => 'SCOTT.myq_prog') ;

end ;
/

drop package body SCOTT.myq_pkg;
drop package SCOTT.myq_pkg;

begin
dbms_aqadm.stop_queue (queue_name => 'SCOTT.myq');

dbms_aqadm.drop_queue (
queue_name => 'SCOTT.myq');

dbms_aqadm.drop_queue_table(
queue_table => 'SCOTT.myqtable');
END;
/