Fonctions de table et Parallèlisme

Il y a quelques semaines, yooo13 qui se reconnaîtra ;-), rebondissant sur un problème de performance d’un des batchs sur lesquels je travaillais, pointait le fait que les fonctions de table ou fonctions « pipelined » peuvent être parallélisées et que j’y trouverais une solution à mon problème…

L’idée simple, et par ailleurs expliquée dans cet article, consiste à prendre le jeu de données d’origine et de le segmenter en jeux plus petits, à la « mapreduce »… Au final et dans mon cas, le résultat est excellent et permet, après 1/2 journée de travail, de diviser les temps de réponse par un facteur d’échelle, passant d’une heure à quelques minutes, par une manipulation purement technique.

Evidemment, ce n’est pas sans de quelques sacrifices. En particulier, l’introduction de code PL/SQL supplémentaire rend la solution, déjà mal écrite/décrite, encore plus difficile à comprendre et ce, malgré tous mes efforts. Mais le sujet de cet article est ailleurs, il s’agit d’illustrer, une des difficultés que j’ai rencontrée, travaillant sur un tout petit jeux de données de 1000 lignes pour équilibrer les « partitions » de mes exécutions en parallèle

Exemple de fonctions de table

Voici 3 fonctions de tables qui implémentent le parallélisme :

  • x_func_by_any partitionne le curseur de données en entrée comme il veut et, en l’occurrence selon le rowid
  • c_func_by_range partitionne le curseur de données en entrée par plage de id
  • c_func_by_range partitionne le curseur de données en entrée par hash de id

Pour analyser cette fonctionnalité, les fonctions de tables renvoient les valeurs minimum/maximum pour Id ainsi que le nombre de lignes traitée pour chaque « partition » en entrée. Le code ainsi développé est disponible ci-dessous :

create or replace package x_pkg is
type x_result is record (id_min number, id_max number, cnt number);
type x_resultset is table of x_result;
type x_input is record (id number, prop1 number);
type x_inputset is ref cursor return x_input ;
function x_func_by_range(p x_inputset) return x_resultset
parallel_enable(partition p by range(id)) pipelined;
function x_func_by_hash(p x_inputset) return x_resultset
parallel_enable(partition p by hash(id)) pipelined;
function x_func_by_any(p x_inputset) return x_resultset
parallel_enable(partition p by any) pipelined;
end x_pkg;
/

create or replace package body x_pkg is
function x_func(p x_inputset) return x_result
is
c number:=0;
i x_input;
x x_result;
begin
x.id_min:=0;
x.id_max:=0;
loop
fetch p into i;
exit when p%notfound;
c:=c+1;
if (i.id<x.id_min or x.id_min=0) then
x.id_min:=i.id;
end if;
if (i.id>x.id_max or x.id_max=0) then
x.id_max:=i.id;
end if;
end loop;
x.cnt:=c;
return x;
end;

function x_func_by_range(p x_inputset) return x_resultset
parallel_enable(partition p by range(id)) pipelined
is
begin
pipe row(x_func(p));
end;

function x_func_by_hash(p x_inputset) return x_resultset
parallel_enable(partition p by hash(id)) pipelined
is
begin
pipe row(x_func(p));
end;

function x_func_by_any(p x_inputset) return x_resultset
parallel_enable(partition p by any) pipelined
is
begin
pipe row(x_func(p));
end;

end x_pkg ;
/

Curseur et Parallélisme

Pour que la fonction de table s’exécute en parallèle, il faut que le curseur passé en paramètre s’exécute en parallèle. Le degré de parallélisme est appliqué au curseur. Pour arriver à l’effet escompté, il faut donc créer un jeu de données à partir duquel on puisse déclencher une exécution en parallèle ; ce n’est par exemple pas possible à partir de la requête ci-dessous :

select /*+ parallel(8) */ * from table(
x_pkg.x_func_by_any(cursor(select 50*rownum id, 1
from dual
connect by level <=1000)));

ID_MIN ID_MAX CNT
------ ------ -----
1 1000 1000

Pour pouvoir utiliser le parallélisme, vous devrez, par exemple, avoir une table sur laquelle faire un accès FULL :

create table x_select 
as select 50*rownum id
from dual
connect by level <=1000;

exec dbms_stats.gather_table_stats(-
ownname => user, -
tabname => 'X_SELECT', -
method_opt => 'for all columns size 254');

Exemple de mise en oeuvre

Sans plus de commentaire, vous trouverez les résultats sur une base de données Oracle 11.2 de l’exécution des fonctions de tables avec un degré de parallélisme de 8 :

col ID_MIN format 99999
col ID_MAX format 99999
col CNT format 9999

select /*+ parallel(8) */ * from table(
x_pkg.x_func_by_any(cursor(select id,1
from X_SELECT)))
order by 1;

ID_MIN ID_MAX CNT
------ ------ -----
0 0 0
0 0 0
0 0 0
0 0 0
0 0 0
0 0 0
50 32850 657
32900 50000 343


select /*+ parallel(8) */ * from table(
x_pkg.x_func_by_range(cursor(select id,1
from X_SELECT)))
order by 1;

ID_MIN ID_MAX CNT
------ ------ -----
50 1150 23
1200 2250 22
2300 3350 22
3400 4450 22
4500 33900 589
33950 35000 22
35050 36100 22
36150 50000 278


select /*+ parallel(8) */ * from table(
x_pkg.x_func_by_hash(cursor(select id,1
from X_SELECT)))
order by 1;

ID_MIN ID_MAX CNT
------ ------ -----
50 49950 120
100 49300 144
150 49850 130
250 49600 110
300 49800 110
400 49900 132
600 49700 133
650 50000 121

Voilà, vous aurez compris qu’équilibrer les jeux de données d’origine n’est pas forcément si intuitif surtout si ce jeu d’origine est tout petit. En revanche, il s’agit indéniablement d’une solution très simple pour tirer avantage du parallélisme sur des algorithmes qui prennent beaucoup de temps CPU et que le moteur SQL ne sait pas paralléliser…