Parallélisations des Jobs avec Advanced Queueing d’Oracle

La DB Oracle nous permet de gérer l’ordonnancement en utilisant des Jobs paramétrables et assez complets comparables au CRON d’Unix. L’objectif de cet article n’est pas d’expliquer les jobs d’Oracle, car il y a pas mal d’articles détaillant ce concept, mais c’est plutôt de poser l’idée d’exécuter le même job programmé, au même moment en multi-instance.
Le principe des jobs programmés d’Oracle (Scheduler Job) est que le même Job ne peut s’exécuter une seconde fois s’il n’a pas encore terminé, je ne dis pas que c’est une mauvaise chose, car cela permet d’avoir un ordre pré-établie.
L’exécution du même Job en plusieurs fois au même moment est une réalité vécue chez un client, elle se schématise dans un cas où : via une base de données il y a une insertion de plusieurs lignes et déclenche un trigger basé sur chaque ligne qui permet d’exécuter un programme Shell. Ce client était affronté à l’erreur ORA-27478

Pour réaliser Un Schedule Job s’exécutant en simultané plusieurs fois, Oracle nous propose un package très intéressant nommé AQ (Advanced Queueing), cet ensemble d’outil de gestion de file d’attente pour les développeurs comprend un package et un script de création des tables necessaires pour sa gestion.
Les deux aspects d’exécution en série d’un Job (Schedule) et l’exécution en simultané du même job plusieurs fois (Schedule + AQ) vont être expliqués ci-dessous par des exemples, je rappelle que ceci a été testé sur une base Oracle 11g Release 11.2.0.1.0 :
1er cas d’exécution en série d’un Job avec déclenchement de l’erreur dans le cas d’exécution en simultané du même programme:

Connected as TESTUSER
SQL> -- execution de scheduler program-job
 SQL> CREATE TABLE LOG_TABLE
 2 (
 3 ID NUMBER,
 4 CALLER VARCHAR2(32),
 5 CALL_TIMESTAMP TIMESTAMP DEFAULT SYSTIMESTAMP
 6 );
 Table created
 SQL> CREATE OR REPLACE PROCEDURE InsertLogTable(pin_Id IN LOG_TABLE.ID%TYPE) AS
 2 BEGIN
 3 INSERT INTO LOG_TABLE (ID,CALLER) VALUES(pin_Id, 'SERIAL_EXECUTION_PRG');
 4 COMMIT;
 5 DBMS_LOCK.sleep(3);
 6 END;
 7 /
Procedure created
 SQL> BEGIN
 2 DBMS_SCHEDULER.CREATE_PROGRAM (program_name => 'SERIAL_EXECUTION_PRG',
 3 program_type =>'STORED_PROCEDURE',
 4 program_action =>'InsertLogTable',
 5 number_of_arguments => 1,
 6 enabled => FALSE,
 7 comments => 'Ce programme simule l execution de package dbms_scheduler.');
 8 END;
 9 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_SCHEDULER.DEFINE_PROGRAM_ARGUMENT(program_name => 'SERIAL_EXECUTION_PRG',
 3 argument_position => 1,
 4 argument_name => 'pin_Id',
 5 argument_type => 'NUMBER',
 6 default_value => NULL);
 7 END;
 8 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_SCHEDULER.CREATE_JOB(job_name => 'SERIAL_EXECUTION_JOB',
 3 program_name=> 'SERIAL_EXECUTION_PRG',
 4 enabled => FALSE,
 5 auto_drop => FALSE,
 6 comments => 'Le job qui simule l execution de dbms_scheduler en inserrant dans log table.');
 7 END;
 8 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_SCHEDULER.enable(NAME =>'SERIAL_EXECUTION_PRG');
 3 END;
 4 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_SCHEDULER.enable(NAME =>'SERIAL_EXECUTION_JOB');
 3 END;
 4 /
 PL/SQL procedure successfully completed
 SQL> pause;
 SQL> ----TEST:run 1
 SQL> BEGIN
 2 DBMS_SCHEDULER.set_job_argument_value(job_name => 'SERIAL_EXECUTION_JOB',
 3 argument_position => 1,
 4 argument_value => 1);
 5 DBMS_SCHEDULER.run_job(job_name => 'SERIAL_EXECUTION_JOB',
 6 use_current_session=> FALSE);
 7 END;
 8 /
PL/SQL procedure successfully completed

Lors de l’execution des deux en simultané l’erreur se déclenche :

 SQL> --run 2
 SQL> BEGIN
 2 DBMS_SCHEDULER.set_job_argument_value(job_name => 'SERIAL_EXECUTION_JOB',
 3 argument_position => 1,
 4 argument_value => 2);
 5 DBMS_SCHEDULER.run_job(job_name => 'SERIAL_EXECUTION_JOB',
 6 use_current_session =>FALSE);
 7 END;
 8 /
BEGIN
 DBMS_SCHEDULER.set_job_argument_value(job_name => 'SERIAL_EXECUTION_JOB',
 argument_position => 1,
 argument_value => 2);
 DBMS_SCHEDULER.run_job(job_name => 'SERIAL_EXECUTION_JOB',
 use_current_session=> FALSE);
 END;
Error at line 6
 ORA-27478: job "MENNAN.SERIAL_EXECUTION_JOB" is running
 ORA-06512: at "SYS.DBMS_ISCHED", line 185
 ORA-06512: at "SYS.DBMS_SCHEDULER", line 486
 ORA-06512: at line 6
SQL> select * from log_table where id is not null;
ID CALLER CALL_TIMESTAMP
 ---------- ---------------------------------------------------------------------------------
 1SERIAL_EXECUTION_PRG 26/03/2013 20:26:30,689920
SQL>
 ---rollback
 DROP TABLE LOG_TABLE;
 DROP PROCEDURE InsertLogTable;
 BEGIN
 DBMS_SCHEDULER.drop_job(job_name => 'SERIAL_EXECUTION_JOB');
 END;
 /
BEGIN
 DBMS_SCHEDULER.drop_program(program_name => 'SERIAL_EXECUTION_PRG');
 END;
 /

Deuxième cas d’exécution en simultané du même programme avec Advanced Queueing

SQL> --parallel execution: scheduler program-job + advanced queue
 SQL> CREATE TABLE LOG_TABLE
 2 (
 3 ID NUMBER,
 4 CALLER VARCHAR2(32),
 5 CALL_TIMESTAMP TIMESTAMP DEFAULT SYSTIMESTAMP
 6 );
Table created
 SQL> CREATE OR REPLACE TYPE LOG_TYPE IS OBJECT
 2 (
 3 ID NUMBER
 4 );
 5 /
Type created
 SQL> BEGIN
 2 DBMS_AQADM.create_queue_table(queue_table => 'AQ_LOG_QUEUE_TABLE',
 3 queue_payload_type => 'LOG_TYPE',
 4 multiple_consumers => TRUE);
 5 END;
 6 /
 PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_AQADM.create_queue(queue_name => 'AQ_LOG_QUEUE',
 3 queue_table =>'AQ_LOG_QUEUE_TABLE');
 4 END;
 5 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_AQADM.start_queue(queue_name =>'AQ_LOG_QUEUE',
 3 enqueue => TRUE,
 4 dequeue => TRUE);
 5 END;
 6 /
PL/SQL procedure successfully completed
 SQL> --select * from all_queue_tables where queue_table = 'AQ_TEST_QUEUE_TABLE';
 SQL> --select * from all_queues where NAME ='AQ_TEST_QUEUE';
 SQL> CREATE OR REPLACE PROCEDUREInsertLogTable2(pit_LogType IN LOG_TYPE) AS
 2 BEGIN
 3 INSERT INTO LOG_TABLE (ID,CALLER) VALUES(pit_LogType.ID, 'PARALLEL_EXECUTION_PRG');
 4 COMMIT;
 5 DBMS_LOCK.sleep(3);
 6 END;
 7 /
Procedure created
 SQL> BEGIN
 2 DBMS_SCHEDULER.create_program(program_name => 'PARALLEL_EXECUTION_PRG',
 3 program_type =>'STORED_PROCEDURE',
 4 program_action =>'InsertLogTable2',
 5 number_of_arguments => 1);
 6 DBMS_SCHEDULER.define_metadata_argument(program_name => 'PARALLEL_EXECUTION_PRG',
 7 argument_position => 1,
 8 metadata_attribute=> 'EVENT_MESSAGE');
 9 DBMS_SCHEDULER.enable('PARALLEL_EXECUTION_PRG');
 10 END;
 11 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_SCHEDULER.create_job(job_name => 'PARALLEL_EXECUTION_JOB',
 3 program_name => 'PARALLEL_EXECUTION_PRG',
 4 event_condition=> 'TAB.USER_DATA.ID IS NOT NULL',
 5 queue_spec => 'AQ_LOG_QUEUE',
 6 auto_drop => FALSE);
 7 DBMS_SCHEDULER.enable('PARALLEL_EXECUTION_JOB');
 8 END;
 9 /
PL/SQL procedure successfully completed
 SQL> BEGIN
 2 DBMS_SCHEDULER.set_attribute(NAME => 'PARALLEL_EXECUTION_JOB',
 3 ATTRIBUTE=> 'parallel_instances',
 4 VALUE => TRUE);
 5 END;
 6 /
PL/SQL procedure successfully completed
 SQL> --select * from all_scheduler_job_log where job_name = 'PARALLEL_EXECUTION_JOB' order by 1 desc;
 SQL> --select * from all_scheduler_job_run_details where job_name = 'PARALLEL_EXECUTION_JOB' order by 1 desc;
 SQL> -- TEST : run 1
 SQL> DECLARE
 2 vt_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
 3 vt_MessagePropertiesDBMS_AQ.MESSAGE_PROPERTIES_T;
 4 vt_RequestObject LOG_TYPE;
 5 vr_MessageId RAW(16);
 6 BEGIN
 7 vt_RequestObject := LOG_TYPE(123);
 8
 9 DBMS_AQ.enqueue(queue_name => 'AQ_LOG_QUEUE',
 10 enqueue_options => vt_EnqueueOptions,
 11 message_properties =>vt_MessageProperties,
 12 payload => vt_RequestObject,
 13 msgid => vr_MessageId);
 14 dbms_output.put_line('vr_MessageId : ' ||vr_MessageId);
 15 COMMIT;
 16 END;
 17 /
PL/SQL procedure successfully completed
 SQL> -- run 2
 SQL> DECLARE
 2 vt_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
 3 vt_MessagePropertiesDBMS_AQ.MESSAGE_PROPERTIES_T;
 4 vt_RequestObject LOG_TYPE;
 5 vr_MessageId RAW(16);
 6 BEGIN
 7 vt_RequestObject := LOG_TYPE(124);
 8
 9 DBMS_AQ.enqueue(queue_name => 'AQ_LOG_QUEUE',
 10 enqueue_options => vt_EnqueueOptions,
 11 message_properties =>vt_MessageProperties,
 12 payload => vt_RequestObject,
 13 msgid => vr_MessageId);
 14 dbms_output.put_line('vr_MessageId : ' ||vr_MessageId);
 15 COMMIT;
 16 END;
 17 /
PL/SQL procedure successfully completed
 SQL> select * from LOG_TABLE;
IDCALLER CALL_TIMESTAMP
 ---------- ---------------------------------------------------------------------------------
 123PARALLEL_EXECUTION_PRG 26/09/2011 20:42:34,078989
 124PARALLEL_EXECUTION_PRG 26/09/2011 20:42:34,080255
SQL>

On voit bien que les deux lignes ont été insérées sans aucune erreur cette fois ci.
Je reviendrai sur ce package dans un prochain article, vous pouvez avoir plus d’informations sur les packages utilisés:
http://docs.oracle.com/cd/B12037_01/appdev.101/b10802/d_aq.htm
http://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_aqadm.htm