Notifiez les Caches Niveau 2 de vos serveurs d'applications via Streams

Dans certaines circonstances, cacher des données sur un serveur d’applications est plus intéressant que de les interroger quand un utilisateur en a besoin. Les frameworks TopLink/EclipseLink ou Hibernate implémentent des caches dit de « niveau 2 » depuis plusieurs années déjà… Dans le cas d’Hibernate, il existe même plusieurs implémentations de caches disponibles sous la forme de plug-ins. Si vous avez besoin de monter en charge ou d’assurer la disponibilité, il est également possible d’utiliser des caches distribués comme Coherence, avec TopLink ou avec Hibernate et de monter ainsi plusieurs instances de caches niveau 2 en clusters.

Ces solutions se généralisent. JPA 2.0 formalise en effet cette gestion de cache à travers la spécification d’un Shared Cache et la définition d’une API standard de gestion associée. Les mêmes concepts sont développés dans les plateformes .Net avec les différents caches de niveau 2 de N-Hibernate ou le développement de ce type d’approches sur Entity Framework. Seulement voilà, qui dit cache, dit nécessité d’invalider le cache !

En effet, si vous modifiez des données par le biais d’ordres SQL ou via une infrastructure d’intégration qui s’appuie sur un adaptateur SQL, il vous faut retirer les objects correspondants des caches pour anticiper les erreurs qui résulteraient de l’incohérence des données dans vos caches applicatifs et vos bases de données. Dans ce qui suit, vous trouverez une mise en oeuvre simple qui s’appuie sur Streams ; L’intérêt de cette solution réside dans le fait qu’elle ne nécessite pas de licences supplémentaires si vous êtes en version Enterprise Edition et n’impacte pas réellement votre base de données. Evidemment il existe d’autres approches comme XStreams, Goldengate ou Streams Downstream Capture qui ont leur propres avantages, mais c’est une autre histoire…

Introduction

Lorsque vous travaillez avec des caches, il est nécessaire de pouvoir différencier la source des changements de vos données. Cela permet d’identifier ce qui vient de votre application et est déjà dans le cache de ce qui vient de l’extérieur et donc nécessite de notifier le cache.

Oracle ne répond pas strictement la question comment faire cette différence et surtout comment le différencier à travers Streams. Il est toutefois assez simple de s’appuyer sur les services en conservant une table de correspondance entre les sid,serial# et services par exemple ou en ajoutant une métadonnées dans les logs à l’aide de la commande dbms_streams_adm.set_tag. Pour gérer ce mécanisme de manière transparente pour l’application les trigger after logon sont surement une bonne approche ; regarder cet article précédent pour créer un trigger after logon et implémenter cette correspondance. Evidemment il existe d’autres possibilités…

Schéma d’exemple

Le script ci-dessous crée un utilisateur et une table pour démontrer la fonctionnalité; nous allons tracer les changements sur cette table

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;

connect demo/demo

create table mytab
(id number,
attr1 number,
attr2 number,
val1 number,
val2 number);

insert into mytab
(select rownum, mod(rownum,11), mod(rownum,17), mod(rownum,5), mod(rownum,7)
from dual
connect by level <=10000);

commit;

Configurer Streams

Pour nos tests, nous allons configurer Streams. Pour le détail de cette opération, reportez-vous à mon ancien article. J’ai tout regroupé toutes les étapes de 1 à 5 en les modifiants pour faire correspondre le nom de la table. Evidemment, vous vous arrêterez avant de configurer la règle de transformations puisque nous allons créer un DML Handler avant de démarrer la capture. Voici le script :

connect / as sysdba

alter database add supplemental log
data (primary key, unique index) columns;

select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;

SUPPLEME SUP SUP
-------- --- ---
IMPLICIT YES YES


var first_scn number;

set serveroutput on

DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/
First SCN Value = 915448

exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'demo.mytab');

CREATE TABLESPACE streams_tbs
DATAFILE '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf' size 25M
AUTOEXTEND ON MAXSIZE 256M;

CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;

grant dba to strmadmin;

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/

connect strmadmin/strmadmin

print first_scn

FIRST_SCN
---------
907820


BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'streams_capture',
rule_set_name => NULL,
source_database => 'BLACK',
use_database_link => false,
first_scn => :first_scn,
logfile_assignment => 'implicit');
END;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

CAPTURE_NAME QUEUE_NAME FIRST_SCN START_SCN RULE_SET_NA
--------------- ------------- ------------- ------------- -----------
STREAMS_CAPTURE STREAMS_QUEUE 915448 915448


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t1',
streams_type => 'capture',
streams_name => 'streams_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true);
END;
/

set lines 120
col rule_owner format a10
col streams_name format a16
col streams_type format a9
col table_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15

select rule_owner,
STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;

RULE_OWNER STREAMS_NAME STREAMS_T TABLE_OWNE TABLE_NAME RULE_TYP RULE_NAME
---------- ---------------- --------- ---------- --------------- -------- ---------
STRMADMIN STREAMS_CAPTURE CAPTURE DEMO MYTAB DML MYTAB5


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'demo.mytab',
streams_type => 'apply',
streams_name => 'streams_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true);
END;
/


col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

APPLY_NAME QUEUE_NAME RULE_SET_NA STATUS MESSAGE_DE
------------- ------------- ----------- -------- ----------
STREAMS_APPLY STREAMS_QUEUE RULESET$_8 DISABLED CAPTURED

Créer et enregistrer un DML Handler

Le DML handler décide de quand et comment notifier le ou les serveurs d’applications. Pour le comment, les choix sont infinis de utl_http via une API REST par exemple, à JMS via une procédure stockée en Java en passant par Streams AQ. Pour les besoins de l’exemple, nous ne ferons que stocker les modifications capturées dans une table sans distinction de la source du changement :

create table notif(
id number,
tag raw(4),
message varchar2(1024));

create sequence notif_seq;

create or replace procedure dml_apply_handler(in_any anydata)
is
command varchar2(10);
lcr sys.lcr$_row_record;
rc pls_integer;
id number;
msg varchar2(1000);
begin
-- Access the LCR and get the LCR command Type
rc := in_any.getobject(lcr);
command := lcr.get_command_type();
msg:=command;

-- In the case of an insert, log the info in c_history
if command = 'INSERT' then
-- Get the DDL text
if lcr.get_value('new','ID').getTypeName = 'SYS.NUMBER' then
rc:=lcr.get_value('new','ID').getNumber(id);
else
id:=0;
end if;
msg:=msg||' - ID: '||to_char(id);
elsif command = 'UPDATE' then
-- Get the DDL text
if lcr.get_value('new','ID').getTypeName = 'SYS.NUMBER' then
rc:=lcr.get_value('new','ID').getNumber(id);
else
id:=0;
end if;
msg:=msg||' - NID/OID: '||to_char(id);
if lcr.get_value('old','ID').getTypeName = 'SYS.NUMBER' then
rc:=lcr.get_value('old','ID').getNumber(id);
else
id:=0;
end if;
msg:=msg||'/'||to_char(id);
elsif command = 'DELETE' then
if lcr.get_value('old','ID').getTypeName = 'SYS.NUMBER' then
rc:=lcr.get_value('old','ID').getNumber(id);
else
id:=0;
end if;
msg:=msg||' -ID: '||to_char(id);
end if;
insert into notif values
(notif_seq.nextval,
lcr.get_tag,
msg);
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'demo.mytab',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'dml_apply_handler',
apply_database_link => null,
apply_name => 'STREAMS_APPLY',
assemble_lobs => false);
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'demo.mytab',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'dml_apply_handler',
apply_database_link => null,
apply_name => 'STREAMS_APPLY',
assemble_lobs => false);
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'demo.mytab',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => 'dml_apply_handler',
apply_database_link => null,
apply_name => 'STREAMS_APPLY',
assemble_lobs => false);
end;
/

col name format a15
col operation_name format a6
col user_procedure format a32
col handler_type format a18

select OBJECT_OWNER||'.'||OBJECT_NAME name,
OPERATION_NAME,
USER_PROCEDURE,
handler_type
from DBA_APPLY_DML_HANDLERS;

NAME OPERAT USER_PROCEDURE HANDLER_TYPE
--------------- ------ -------------------------------- ------------------
DEMO.MYTAB INSERT "STRMADMIN"."DML_APPLY_HANDLER" PROCEDURE HANDLER
DEMO.MYTAB UPDATE "STRMADMIN"."DML_APPLY_HANDLER" PROCEDURE HANDLER
DEMO.MYTAB DELETE "STRMADMIN"."DML_APPLY_HANDLER" PROCEDURE HANDLER

SCN Apply et démarrage de Streams

Pour démarrer, il reste à préciser à l’apply à partir de quel SCN il doit notifier l’application; vous pourrez alors démarrer les processus de capture et d’apply :

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'demo.mytab',
source_database_name => 'BLACK',
instantiation_scn => :first_scn);
end;
/

col SOURCE_DATABASE format a6
col OBJECT format a10
col INSTANTIATION_SCN format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

SOURCE OBJECT INSTANTIATION_SCN
------ ---------- -----------------
BLACK DEMO.MYTAB 915448


exec dbms_capture_adm.start_capture('streams_capture');
exec dbms_apply_adm.start_apply('streams_apply');

Tests et Performances

Il ne vous reste plus qu’à tester les différents changements :

insert into demo.mytab 
values (10001,1,1,1,1);

commit;

exec dbms_streams_adm.set_tag(HEXTORAW('11'));

delete from demo.mytab where id=10001;

commit;

col id format 99999
col tag format a5
col message format a50
select * from notif;

ID TAG MESSAGE
---- ----- ----------------------
1 INSERT - ID: 10001
2 11 DELETE - ID: 10001

Un petit test montre que l’impact sur les performances de l’application est bien moindre que dans le cas d’un trigger et du même ordre de grandeur que Continuous Query Notification (CQN) :

set timing on
begin
for i in 10002..20000 loop
insert into demo.mytab
values (i,1,1,1,1);
commit;
end loop;
end;
/

Elapsed: 00:00:02.80

create table demo.mytab2
(id number,
attr1 number,
attr2 number,
val1 number,
val2 number);

set timing on
begin
for i in 10002..20000 loop
insert into demo.mytab2
values (i,1,1,1,1);
commit;
end loop;
end;
/

Elapsed: 00:00:00.94

Conclusion

Comme vous pouvez vous en rendre co
mpte, mettre en place une collaboration basée sur un principe de publish/subscribe entre Oracle et un cache niveau 2 d’un serveur d’applications n’est pas très compliqué. Streams est sûrement une solution élégante et avec un impact très limité sur les performances de la base de données. Il ne vous reste plus qu’à adapter ce qui précède à votre contexte…

Comme d’habitude, nettoyez votre environnement :

connect / as sysdba
exec dbms_capture_adm.stop_capture('streams_capture');
exec dbms_apply_adm.stop_apply('streams_apply');

exec dbms_capture_adm.drop_capture('streams_capture',true);
exec dbms_apply_adm.drop_apply('streams_apply',true);

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

begin
for i in (select source_schema name
from dba_apply_instantiated_schemas
where source_schema in ('DEMO'))
loop
dbms_apply_adm.set_schema_instantiation_scn(
source_schema_name => i.name,
source_database_name => 'BLACK',
instantiation_scn => null);
end loop;
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('DEMO'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => 'BLACK',
instantiation_scn => null);
end loop;
end;
/
alter database drop supplemental log
data (primary key, unique index) columns;

select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;

SUPPLEME SUP SUP
-------- --- ---
NO NO NO


drop user demo cascade;

1 réflexion sur “Notifiez les Caches Niveau 2 de vos serveurs d'applications via Streams”

  1. Notez que ça marche très bien dans le cas de Streams également ; en cas d’erreur sur le handler, il n’y a pas d’erreur sur la transaction.

Les commentaires sont fermés.