16. Calcul distribué

Nous abordons maintenant le domaine des traitements analytiques à grande échelle qui, contrairement à des fonctions de recherche qui s’intéressent à un document précis ou à un petit sous-ensemble d’une collection, parcourent l’intégralité d’un large ensemble pour en extraire des informations et construire des modèles statistiques ou analytiques.

La préoccupation essentielle n’est pas ici la performance (de toute façon, les traitements durent longtemps) mais la garantie de scalabilité horizontale qui permet malgré tout d’obtenir des temps de réponse raisonnables, et surtout la garantie de terminaison en dépit des pannes pouvant affecter le système pendant le traitement.

Ce chapitre ne considère pas l’agorithmique analytique proprement dite, mais les opérateurs de manipulation de données qui fournissent l’information à ces algorithmes. En clair, il s’agit de voir comment récupérer des sources de données, comment les filtrer, les réorganiser, les combiner, les enrichir, le tout en respectant les deux contraintes fondamentales de la scalabilité: parallélisation et tolérance aux pannes.

La notion d’opérateur de second ordre

Les opérateurs décrits ici sont des opérateurs de second ordre. Contrairement aux opérateurs classiques qui s’appliquent directement à des données, un opérateur de second ordre prend des fonctions en paramètres et applique ces fonctions à des données au cours d’un traitement immuable (par exemple un parcours séquentiel).

Depuis 2004, le modèle phare d’exécution est MapReduce, déjà introduit dans le chapitre Interrogation de bases NoSQL dans un contexte centralisé, et distribué en Open Source dans le système Hadoop. MapReduce est le premier modèle à combiner distribution massive et reprise sur panne dans le contexte d’un cloud de serveurs à bas coûts. Ses limites sont cependant évidentes: faible expressivité (très peu d’opérateurs) et performances médiocres.

Très rapidement, des langages de plus haut niveau (Pig, Hive) ont été proposés, avec pour objectif notable l’expression d’opérateurs plus puissants (par exemple les jointures). Ces opérateurs restent exécutables dans un contexte MapReduce, un peu comme SQL est exécutable dans un système basé sur des parcours de fichier. Enfin, récemment, des systèmes proposant des alternatives plus riches à Hadoop ont commencé à émerger. La motivation essentielle est de fournir un support aux algorithmes fonctionnant par itération. C’est le cas d’un grand nombre de techniques en fouilles de données qui affinent progressivement un résultat jusqu’à obtenir une solution optimale. MapReduce est (était) très mal adapté à ce type d’exécution. Les systèmes comme Spark ou Flink constituent de ce point de vue un progrès majeur.

Ce chapitre suit globalement cette organisation historique, en commençant par MapReduce, suivi d’une présentation du langage Pig, et finalement une introduction aux systèmes itératifs.

S1: MapReduce

Reportez-vous au chapitre Interrogation de bases NoSQL pour une présentation du modèle MapReduce d’exécution. Rappelons que MapReduce n’est pas un langage d’interrogation de données, mais un modèle d’exécution de chaînes de traitement dans lesquelles des données (massives) sont progressivement transformées et enrichies.

Pour être concrets, nous allons prendre l’exemple (classique) d’un traitement s’appliquant à une collection de documents textuels et déterminant la fréquence des termes dans les documents (indicateur TF, cf. Recherche avec classement). Pour chaque terme présent dans la collection, on doit donc obtenir le nombre d’occurrences.

Le principe de localité des données

Dans une approche classique de traitement de données stockées dans une base, on utilise une architecture client-serveur dans laquelle l’application cliente reçoit les données du serveur. Cette méthode est difficilement applicable en présence d’un très gros volume de données, et ce d’autant moins que les collections sont stockées dans un système distribué. En effet:

  • le transfert par le réseau d’une large collection devient très pénalisant à grande échelle (disons, le TéraOctet);
  • et surtout, la distribution des données est une opportunité pour effectuer les calculs en parallèle sur chaque machine de stockage, opportunité perdue si l’application cliente fait tout le calcul.

Ces deux arguments se résument dans un principe dit de localité des données (data locality). Il peut s’énoncer ainsi: les meilleures performances sont obtenues quand chaque fragment de la collection est traité localement, minimisant les besoins d’échanges réseaux entre les machines.

Note

Reportez-vous au chapitre Le cloud, une nouvelle machine de calcul pour une analyse quantitative montrant l’intérêt de ce principe.

L’application du principe de localité des données mène à une architecture dans laquelle, contrairement au client-serveur, les données ne sont pas transférées au programme client, mais le programme distribué à toutes les machines stockant des données (figure Principe de localité des données, par transfert des programmes).

_images/data-locality.png

Fig. 16.1 Principe de localité des données, par transfert des programmes

En revanche, demander à un développeur d’écrire une application distribuée basée sur ce principe constitue un défi technique de grande ampleur. Il faut en effet concevoir simultanément les tâches suivantes:

  • implanter la logique de l’application, autrement dit le traitement particulier qui peut être plus ou moins complexe;
  • concevoir la parallélisation de cette application, sous la forme d’une exécution concurrente coordonnant plusieurs machines et assurant un accès à un partitionnement de la collection traitée;
  • et bien entendu, gérer la reprise sur panne dans un environnement qui, nous l’avons vu, est instable.

Un framework d’exécution distribuée comme MapReduce est justement dédié à la prise en charge des deux derniers aspects, spécifiques à la distribution dans un cloud, et ce de manière générique. Le framework définit un processus immuable d’accès et de traitement, et le programmeur implante la logique de l’application sous la forme de briques logicielles confiées au framework et appliquées par ce dernier dans le cadre du processus.

Avec MapReduce, le processus se déroule en deux phases, et les “briques logicielles” consistent en deux fonctions fournies par le développeur. La phase de Map traite chaque document individuellement et applique une fonction map() dont voici le pseudo-code pour notre application de calcul du TF.

function mapTF($id, $contenu)
{
  // $id: identifiant du document
  // $contenu: contenu textuel du document

  // On boucle sur tous les termes du contenu
  foreach  ($t in $contenu) {
    // Comptons le nb d'occurrences de $t dans $contenu
    $count = nbOcc ($t, $contenu);
    // On "émet" le terme et son nombre d'occurrences
    emit ($t, $count);
  }
}

La phase de Reduce reçoit des valeurs groupées sur la clé et applique une agrégation de ces valeurs. Voici le pseudo-code pour notre application TF.

function reduceTF($t, $compteurs)
{
  // $t: un terme
  // $compteurs: les nombres d'occurrences calculés localement
  $total = 0;

  // Boucles sur les compteurs et calcul du total
  foreach ($c in $compteurs) {
   $total = $total + $c;
  }

  // Et on produit le résultat
  return $total;
}

Dans ce cadre restreint, le framework prend en charge la distribution et la reprise sur panne.

Important

Ce processus en deux phases et très limité et ne permet pas d’exprimer des algorithmes complexes, ceux basés par exemple sur une itération menant progressivement au résultat. C’est l’objectif essentiel de modèles d’exécution plus puissants que nous présentons ultérieurement.

Exécution distribuée d’un traitement MapReduce

La figure Exécution distribuée d’un traitement MapReduce résume l’exécution d’un traitement (“job”) MapReduce avec un framework comme Hadoop. Le système d’exécution distribué fonctionne sur une architecture maître-esclave dans laquelle le maître (JobTracker dans Hadoop) se charge de recevoir la requête de l’application, la distribue sous forme de tâche à des nœuds (TaskTracker dans Hadoop) accédant aux fragments de la collection, et coordonne finalement le déroulement de l’exécution. Cette coordination inclut notamment la gestion des pannes.

_images/mr-execution.png

Fig. 16.2 Exécution distribuée d’un traitement MapReduce

L’application cliente se connecte au maître, transmet les fonctions de Map et de Reduce, et soumet la demande d’exécution. Le client est alors libéré, en attente de la confirmation par le maître que le traitement est terminé (cela peut prendre des jours ...). Le framework fournit des outils pour surveiller le progrès de l’exécution pendant son déroulement.

Le traitement s’applique à une source de données partitionnée. Cette source peut être un simple système de fichiers distribués, un système relationnel, un système NoSQL type MongoDB ou HBase, voire même un moteur de recherche comme Solr ou ElasticSearch.

Le Maître dispose de l’information sur le partitionnement des données (l’équivalent du contenu de la table de routage, présenté dans le chapitre sur le partitionnement) ou la récupère du serveur de données. Un nombre M de serveurs stockant tous les fragments concernés est alors impliqué dans le traitement. Idéalement, ces serveurs vont être chargés eux-mêmes du calcul pour respecter le principe de localité des données mentionné ci-dessus. Un système comme Hadoop fait de son mieux pour respecter ce principe.

La fonction de Map est transmise aux M serveurs et une tâche dite Mapper applique la fonction à un fragment. Si le serveur contient plusieurs fragments (ce qui est le cas normal) il faudra exécuter autant de tâches. Si le serveur est multi-cœurs, plusieurs fragments peuvent être traités en parallèle sur la même machine.

Exemple: le partitionnement des données pour l’application TF

Supposons par exemple que notre collection contienne 1 million de documents dont la taille moyenne est de 1000 octets. On découpe la collection en fragments de 64 MOs. Chaque fragment contient donc 64 000 documents. Il y a donc à peu près \(\lceil 1,000,000/64,000 \rceil \approx 16,000\) fragments. Si on dispose de 16 machines, chacune devra traiter (en moyenne) 1000 fragments et donc exécuter mille tâches de Mapper.

Chaque mapper travaille, dans la mesure du possible, localement: le fragment est lu sur le disque local, document par document, et l’application de la fonction de Map “émet” des paires (clé, valeur) dites “intermédiaires” qui sont stockées sur le disque local. Il n’y a donc aucun échange réseau pendant la phase de Map (dans le cas idéal où la localité des données peut être complètement respectée).

Exemple: la phase de Map pour l’application TF

Supposons que chaque document contienne en moyenne 100 termes distincts. Chaque fragment contient 64 000 documents. Un Mapper va donc produire 6 400 000 paires (t, c)t est un terme et c le nombre d’occurrences.

À l’issue de la phase de Map, le maître initialise la phase de Reduce en choisissant R machines disponibles. Il faut alors distribuer les paires intermédiaires à ces R machines. C’est une phase “cachée”, dite de shuffle, qui constitue potentiellement le goulot d’étranglement de l’ensemble du processus car elle implique la lecture sur les disques des Mappers de toutes les paires intermédiaires, et leur envoi par réseau aux machines des Reducers.

Important

Vous noterez peut-être qu’une solution beaucoup plus efficace serait de transférer immédiatement par le réseau les paires intermédiaires des Mappers vers les Reducers. Il y a une explication à ce choix en apparence sous-optimal: c’est la reprise sur panne (voir plus loin).

Pour chaque paire intermédiaire, un simple algorithme de hachage permet de distribuer les clés équitablement sur les R machines chargées du Reduce.

Au niveau d’un Reducer Ri, que se passe-t-il?

  • Tout d’abord il faut récupérer toutes les paires intermédiaires produites par les Mappers et affectées à Ri.
  • Il faut ensuite trier ces paires sur la clé pour regrouper les paires partageant la même clé. On obtient des paires (k, [v])k est une clé, et [v] la liste des valeurs reçues par le Reducer pour cette clé.
  • Enfin, chacune des paires (k, [v]) est soumise à la fonction de Reduce.

Exemple: la phase de Reduce pour l’application TF

Supposons R=10. Chaque Reducer recevra donc en moyenne 640 000 paires (t, c) de chaque Mapper. Ces paires sont triées sur le terme t. Pour chaque terme on a donc la liste des nombres d’occurences trouvés dans chaque document par les Mappers. Au pire, si un terme est présent dans chaque document, le tableau [v] contient un million d’entiers.

Il reste, avec la fonction de Reduce, à faire le total de ces nombres d’occurences pour chaque terme.

Exemple: comptons les loups et le moutons

Vous souvenez-vous de ces quelques documents?

  • A: Le loup est dans la bergerie.
  • B: Les moutons sont dans la bergerie.
  • C: Un loup a mangé un mouton, les autres loups sont restés dans la bergerie.
  • D: Il y a trois moutons dans le pré, et un mouton dans la gueule du loup.

Ils sont maintenant stockés dans un système partitionné sur 3 serveurs comme montré sur la figure Un exemple minuscule mais concret. Nous appliquons notre traitement TF pour compter le nombre total d’occurrences de chaque terme (on va s’intéresser aux termes principaux).

_images/mr-execution-ex.png

Fig. 16.3 Un exemple minuscule mais concret

Nous avons trois Mappers qui produisent les données intermédiaires présentées sur la figure. Comprenez-vous pourquoi le terme bergerie apparaît deux fois pour le premier Mapper par exemple?

La phase de Reduce, avec 2 Reducers, n’est illustrée que pour le terme loup donc on suppose qu’il est affecté au premier Reducer. Chaque Mapper transmet donc ses paires intermédiaires (loup, ...) à R1 qui se charge de regrouper et d’appliquer la fonction de Reduce.

Quand tous les Reducers ont terminé, le résultat est disponible sur leur disque local. Le client peut alors le récupérer.

La reprise sur panne

Comment assurer la gestion des pannes pour une exécution MapReduce? Dans la mesure où elle peut consister en centaines de tâches individuelles, il est inenvisageable de reprendre l’ensemble de l’exécution si l’une de ces tâches échoue, que ce soit en phase de Map ou en phase de Reduce. Le temps de tout recommencer, une nouvelle panne surviendrait, et le job ne finirait jamais.

Le modèle MapReduce a été conçu dès l’origine pour que la reprise sur panne puisse être gérée au niveau de chaque tâche individuelle, et que la coordination de l’ensemble soit également résiliente aux problèmes de machine ou de réseau.

Le Maître délègue les tâches aux machines et surveille la progression de l’exécution. Si une tâche semble interrompue, le Maître initie une action de reprise qui dépend de la phase.

Panne en phase de Reduce

Si la machine reste accessible et que la panne se résume à un échec du processus, ce dernier peut être relancé sur la même machine, et si possible sur les données locales déjà transférées par le shuffle. C’est le cas le plus favorable.

Sinon, une reprise plus radicale consiste à choisir une autre machine, et à relancer la tâche en réinitialisant le transfert des paires intermédiaires depuis les machines chargées du Map. C’est possible car ces paires ont été écrites sur les disques locaux et restent donc disponibles. C’est une caractéristique très importante de l’exécution MapReduce: l’écriture complète des fragments intermédiaires garantit la possibilité de reprise en cas de panne.

Une méthode beaucoup plus efficace mais beaucoup moins robuste consisterait à ce que chaque mapper transfère immédiatement les paires intermédiaires, sans écriture sur le disque local, vers la machine chargée du Reduce. Mais en cas de panne de ce dernier, ces paires intermédiaires risqueraient de disparaître et on ne saurait plus effectuer la reprise sur panne (sauf à ré-exécuter l’ensemble du processus).

Cette caractéristique explique également la lenteur d’une exécution MapReduce, due en grande partie à la nécessité d’effectuer des écritures et lectures répétées sur disque, à chaque phase.

Panne en phase Map

En cas de panne pendant l’exécution d’une tâche de Map, on peut soit reprendre la tâche sur la même machine si c’est le processus qui a échoué, soit transférer la tâche à une autre machine. On tire ici parti de la réplication toujours présente dans les systèmes distribués: quel que soit le fragment stocké sur une machine, il existe un réplica de ce fragment sur une autre, et à partir de ce réplica une tâche équivalente peut être lancée.

Le cas le plus pénalisant est la panne d’une machine pendant la phase de transfert vers les Reducers. Il faut alors reprendre toutes les tâches initialement allouées à la machine, en utilisant la réplication.

Et le maître?

Finalement, il reste à considérer le cas du Maître qui est un point individuel d’échec: en cas de panne, il faut tout recommencer.

L’argument des frameworks comme Hadoop est qu’il existe un Maître pour des dizaines de travailleurs, et qu’il est peu probable qu’une panne affecte directement le serveur hébergeant le nœud-Maître. Si cela arrive, on peut accepter de reprendre l’ensemble de l’exécution, ou prendre des mesures préventives en dupliquant toutes les données du Maître sur un nœud de secours.

Exercices

Exercice: MapReduce en distribué avec MongoDB

Vous devez avoir implanté un compteur de mots avec MongoDB dans la chapitre Interrogation de bases NoSQL. Vous devriez également avoir engendré une collection volumineuse et distribuée grâce au générateur de données ipsum (cf. chapitre Systèmes NoSQL: le partitionnement). Il ne reste plus qu’à faire l’essai: lancer, en vous connectant au routeur mongos, le calcul MapReduce dans MongoDB. Ce calculer devrait insérer le résultat dans une collection partitionnée présente sur les différents serveurs. À vous de jouer.

Exercice: MapReduce et Hadoop.

La mise en place d’une expérimentation avec Hadoop est assez lourde et en fait assez peu instructive. Si vous y tenez, reportez-vous aux travaux pratiques proposés dans le livre Web Data management, chapitre Hadoop. Il est sans doute plus utile maintenant d’apprendre à exprimer des opérations (dont le Map et le Reduce) avec des systèmes modernes plus faciles à mettre en place comme Spark ou Flink.

Exercice: un grep, en MapReduce

On veut scanner des millards de fichiers et afficher tous ceux qui contiennent une chaîne de caractères c. Donnez la solution en MapReduce, en utilisant le formalisme de votre choix (de préférence un pseudo-code un peu structuré quand même).

Correction

Pas bien compliqué. Map: on charge les fichier un par un (ou ligne par ligne s’ils sont vraiment très gros, on cherche c et si on la trouve on émet le nom du fichier et un indicateur quelconque. Reduce: on émet le nom du fichier, éventellement avec le nombre d’occurrences trouvées de c.

Exercice: un rollup, en MapReduce

Une grande surface enregistre tous ses tickets de caisse, indiquant les produits vendus, le prix et la date, ainsi que le client si ce dernier a une carte de fidélité.

Les produits sont classés selon une taxonomie comme illustré sur la figure Les produits et leur classement., avec des niveaux de précision. Pour chaque produit on sait à quelle catégorie précise de N1 il appartient (par exemple, chaussure); pour chaque catégorie on connaît son parent.

_images/taxonomie.png

Fig. 16.4 Les produits et leur classement.

Supposons que la collection Tickets contienne des documents de la forme (idTicket, idClient, idProduit, catégorie, date, prix). Comment obtenir en MapReduce le total des ventes à une date d, pour le niveau N2? On fait donc une agrégation de Tickets au niveau supérieur de la taxonomie.

S2: langages de traitement: Pig

MapReduce est un système orienté vers les développeurs qui doivent concevoir et implanter la composition de plusieurs jobs pour des algorithmes complexes qui ne peuvent s’exécuter en une seule phase. Cette caractéristique rend également les systèmes MapReduce difficilement accessibles à des non-programmeurs.

La définition de langages de plus haut niveau permettant de spécifier des opérations complexes sur les données est donc apparue comme une nécessité dès les premières versions de systèmes comme Hadoop. L’initiative est souvent venue de communautés familières des bases de données et désirant retrouver la simplicité et la “déclarativité” du langage SQL, transposées dans le domaine des chaînes de traitements pour données massives.

Cette section présente le langage Pig latin, une des premières tentatives du genre, une des plus simples, et surtout l’une des plus représentatives des opérateurs de manipulation de données qu’il est possible d’exécuter sous forme de jobs MapReduce en conservant la scalabilité et la gestion des pannes.

Pig latin (initialement développé par un laboratoire Yahoo!) est un projet Apache disponible à http://pig.apache.org. Récupérez la dernière version sous la forme d’une archive compressée et décompressez-la quelque part, dans un répertoire que nous appellerons pigdir.

Nous utiliserons directement l’interpréteur de scripts (nommé grunt) qui se lance avec:

<pigdir>/bin/pig -x local

L’option local indique que l’on teste les scripts en local, ce qui permet de les mettre au point sur de petits jeux de données avant de passer à une exécution distribuée à grande échelle dans un framework MapReduce.

Cet interpréteur affiche beaucoup de messages, ce qui devient rapidement désagréable. Pour s’en débarasser, créer un fichier nolog.conf avec la ligne:

log4j.rootLogger=fatal

Et lancez Pig en indiquant que la configuration des log est dans ce fichier:

<pigdir>/bin/pig -x local -4 nolog.conf

Une session illustrative

Pig applique des opérateurs à des flots de données semi-structurées. Le flot initial (en entrée) est constituée par lecture d’une source de données quelconque contenant des documents qu’il faut structurer selon le modèle de Pig, à peu de choses près comparable à ce que proposent XML ou JSON.

Dans un contexte réel, il faut implanter un chargeur de données depuis la source. Nous allons nous contenter de prendre un des formats par défaut, soit un fichier texte dont chaque ligne représente un document, et dont les champs sont séparés par des tabulations. Nos documents sont des entrées bibliographiques d’articles scientifiques que vous pouvez récupérer à http://b3d.bdpedia.fr/files/journal-small.txt. En voici un échantillon.

2005    VLDB J. Model-based approximate querying in sensor networks.
1997    VLDB J. Dictionary-Based Order-Preserving String Compression.
2003        SIGMOD Record   Time management for new faculty.
2001    VLDB J. E-Services - Guest editorial.
2003        SIGMOD Record   Exposing undergraduate students to system internals.
1998    VLDB J. Integrating Reliable Memory in Databases.
1996    VLDB J. Query Processing and Optimization in Oracle Rdb
1996    VLDB J. A Complete Temporal Relational Algebra.
1994        SIGMOD Record   Data Modelling in the Large.
2002        SIGMOD Record   Data Mining: Concepts and Techniques - Book Review.
...

Voici à titre d’exemple introductif un programme Pig complet qui calcule le nombre moyen de publications par an dans la revue SIGMOD Record.

-- Chargement des documents de journal-small.txt
articles = load 'journal-small.txt'
    as (year: chararray, journal:chararray, title: chararray) ;
sr_articles = filter articles BY journal=='SIGMOD Record';
year_groups = group sr_articles by year;
count_by_year = foreach year_groups generate group, COUNT(sr_articles.title);
dump count_by_year;

Quand on l’exécute sur notre fichier-exemple, on obtient le résultat suivant:

(1977,1)
(1981,7)
(1982,3)
(1983,1)
(1986,1)
...

Un programme Pig est essentiellement une séquence d’opérations, chacune prenant en entrée une collection de documents (les collections sont nommées bag dans Pig latin, et les documents sont nommés tuple) et produisant en sortie une autre collection. La séquence définit une chaîne de traitements transformant progressivement les documents.

_images/pig-workflow.png

Fig. 16.5 Un exemple de workflow (chaîne de traitements) avec Pig

Il est intéressant de décomposer, étape par étape, cette chaîne de traitement pour inspecter les collections intermédiaires produites par chaque opérateur.

Chargement. L’opérateur load crée une collection initiale articles par chargement du fichier. On indique le schéma de cette collection pour interpréter le contenu de chaque ligne. Les deux commandes suivantes permettent d’inspecter respectivement le schéma d’une collection et un échantillon de son contenu.

grunt> describe articles;
articles: {year: chararray,journal: chararray,title: chararray}

grunt> illustrate articles;
---------------------------------------------------------------------------
| articles | year: chararray | journal: chararray | title: chararray      |
---------------------------------------------------------------------------
|          | 2003            | SIGMOD Record      | Call for Book Reviews.|
---------------------------------------------------------------------------

Pour l’instant, nous sommes dans un contexte simple où une collection peut être vue comme une table relationnelle. Chaque ligne/document ne contient que des données élémentaires.

Filtrage. L’opération de filtrage avec filter opère comme une clause where en SQL. On peut exprimer avec Pig des combinaisons Booléennes de critères sur les attributs des documents. Dans notre exemple le critère porte sur le titre du journal.

Regroupement. On regroupe maintenant les tuples/documents par année avec la commande group by. À chaque année on associe donc l’ensemble des articles parus cette année-là, sous la forme d’un ensemble imbriqué. Examinons la représentation de Pig:

grunt> year_groups = GROUP sr_articles BY year;

grunt> describe year_groups;
  year_groups: {group: chararray,
    sr_articles: {year: chararray,journal: chararray,title:chararray}}

grunt> illustrate year_groups;
 group: 1990
 sr_articles:
  {
   (1990, SIGMOD Record, An SQL-Based Query Language For Networks of Relations.),
   (1990, SIGMOD Record, New Hope on Data Models and Types.)
  }

Le schéma de la collection year_group, obtenu avec describe, comprend donc un attribut nommé group correspondant à la valeur de la clé de regroupement (ici, l’année) et une collection imbriquée nommée d’après la collection-source du regroupement (ici, sr_articles) et contenant tous les documents partageant la même valeur pour la clé de regroupement.

L’extrait de la collection obtenu avec illustrate montre le cas de l’année 1990.

À la syntaxe près, nous sommes dans le domaine familier des documents semi-structurés. Si on compare avec JSON par exemple, les objets sont notés par des parenthèses et pas par des accolades, et les ensembles par des accolades et pas par des crochets. Une différence plus essentielle avec une approche semi-structurée de type JSON ou XML est que le schéma est distinct de la représentation des documents: à partir d’une collection dont le schéma est connu, l’interpréteur de Pig infère le schéma des collections calculées par les opérateurs. Il n’est donc pas nécessaire d’inclure le schéma avec le contenu de chaque document.

Le modèle de données de Pig comprend trois types de valeurs:

  • Les valeurs atomiques (chaînes de caractères, entiers, etc.).
  • Les collections (bags pour Pig) dont les valeurs peuvent être hétérogènes.
  • Les documents (tuples pour Pig), équivalent des objets en JSON: des ensembles de paires (clé, valeur).

On peut construire des structures arbitrairement complexes par imbrication de ces différents types. Comme dans tout modèle semi-structuré, il existe très peu de contraintes sur le contenu et la structure. Dans une même collection peuvent ainsi cohabiter des documents de structure très différente.

Application de fonctions. Un des besoins récurrents dans les chaînes de traitement est d’appliquer des fonctions pour annoter, restructurer ou enrichir le contenu des documents passant dans le flux. Ici, la collection finale avg_nb est obtenue en appliquant une fonction standard count(). Dans le cas général, on applique des fonctions applicatives intégrées au contexte d’exécution Pig: ces fonctions utilisateurs (User Defined Functions ou UDF) sont le moyen privilégié de combiner les opérateurs d’un langage comme Pig avec une application effectuant des traitements sur les documents. L’opérateur foreach/generate permet cette combinaison.

Les opérateurs

La table ci-dessous donne la liste des principaux opérateurs du langage Pig. Tous s’appliquent à une ou deux collections en entrée et produisent une collection en sortie.

Opérateur Description
foreach Applique une expression à chaque document de la collection
filter Filtre les documents de la collection
order Ordonne la collection
distinct Elimine lse doublons
cogroup Associe deux groupes partageant une clé
cross Produit cartésien de deux collections
join Jointure de deux collections
union Union de deux collections

Voici quelques exemples pour illustrer les aspects essentiels du langage, basés sur le fichier http://b3d.bdpedia.fr/files/webdam-books.txt. Chaque ligne contient l’année de parution d’un livre, le titre et un auteur.

1995        Foundations of Databases Abiteboul
1995        Foundations of Databases Hull
1995        Foundations of Databases Vianu
2012        Web Data Management Abiteboul
2012    Web Data Management Manolescu
2012        Web Data Management Rigaux
2012        Web Data Management Rousset
2012        Web Data Management Senellart

Le premier exemple ci-dessous montre une combinaison de group et de foreach permettant d’obtenir une collection avec un document par livre et un ensemble imbriqué contenant la liste des auteurs.

-- Chargement de la collection
books = load 'webdam-books.txt'
    as (year: int, title: chararray, author: chararray) ;
group_auth = group books by title;
authors = foreach group_auth generate group, books.author;
dump authors;

L’opérateur foreach applique une expression aux attributs de chaque document. Encore une fois, Pig est conçu pour que ces expressions puissent contenir des fonctions externes, ou UDF (User Defined Functions), ce qui permet d’appliquer n’importe quel type d’extraction ou d’annotation.

L’ensemble résultat est le suivant:

(Foundations of Databases,
   {(Abiteboul),(Hull),(Vianu)})
(Web Data Management,
   {(Abiteboul),(Manolescu),(Rigaux),(Rousset),(Senellart)})

L’opérateur flatten sert à “aplatir” un ensemble imbriqué.

-- On prend la collection group_auth et on l'aplatit
flattened = foreach group_auth generate group ,flatten(books.author);

On obtient:

(Foundations of Databases,Abiteboul)
(Foundations of Databases,Hull)
(Foundations of Databases,Vianu)
(Web Data Management,Abiteboul)
(Web Data Management,Manolescu)
(Web Data Management,Rigaux)
(Web Data Management,Rousset)
(Web Data Management,Senellart)

L’opérateur cogroup prend deux collections en entrée, crée pour chacune des groupes partageant une même valeur de clé, et associe les groupes des deux collections qui partagent la même clé. C’est un peu compliqué en apparence; regardons la figure L’opérateur cogroup de Pig.. Nous avons une collection A avec des documents d dont la clé de regroupement vaut a ou b, et une collection B avec des documents d’. Le cogroup commence par rassembler, séparément dans A et B, les documents partageant la même valeur de clé. Puis, dans une seconde phase, les groupes de documents provenant des deux collections sont assemblés, toujours sur la valeur partagée de la clé.

_images/pig-cogroup.png

Fig. 16.6 L’opérateur cogroup de Pig.

Prenons une seconde collection, contenant des éditeurs (fichier http://b3d.bdpedia.fr/files/webdam-publishers.txt):

Fundations of Databases     Addison-Wesley  USA
Fundations of Databases     Vuibert France
Web Data Management         Cambridge University Press      USA

On peut associer les auteurs et les éditeurs de chaque livre de la manière suivante.

--- Chargement de la collection
publishers = load 'webdam-publishers.txt'
  as (title: chararray, publisher: chararray) ;
cogrouped = cogroup flattened by group, publishers by title;

Le résultat est le suivant.

(Foundations of Databases,
  { (Foundations of Databases,Abiteboul),
    (Foundations of Databases,Hull),
    (Foundations of Databases,Vianu)
  },
  {(Foundations of Databases,Addison-Wesley),
   (Foundations of Databases,Vuibert)
  }
)

Il contient un document pour chaque livre avec trois attributs. Le premier est la valeur de la clé de regroupement (le titre du livre). Le second est l’ensemble des documents de la première collection correspondant à la clé, le troisième l’ensemble des documents de la seconde collection correspondant à la clé.

Il s’agit d’une forme de jointure qui regroupe, en un seul document, tous les documents des deux collections en entrée qui peuvent être appariés. On peut aussi exprimer la jointure ainsi:

-- Jointure entre la collection 'flattened' et  'publishers'
joined = join flattened by group, publishers by title;

On obtient cependant une structure différente de celle du cogroup, tout à fait semblable à celle d’une jointure avec SQL, dans laquelle les informations ont été “aplaties”.

(Foundations of Databases,Abiteboul,Fundations of Databases,Addison-Wesley)
(Foundations of Databases,Abiteboul,Fundations of Databases,Vuibert)
(Foundations of Databases,Hull,Fundations of Databases,Addison-Wesley)
(Foundations of Databases,Hull,Fundations of Databases,Vuibert)
(Foundations of Databases,Vianu,Fundations of Databases,Addison-Wesley)
(Foundations of Databases,Vianu,Fundations of Databases,Vuibert)

La comparaison entre cogroup et join montre la flexibilité apportée par un modèle semi-structuré et sa capacité à représenter des ensembles imbriqués. Une jointure relationnelle doit produire des tuples “plats”, sans imbrication, alors que le cogroup autorise la production d’un état intermédiaire où toutes les données liées sont associées dans un même document, ce qui peut être très utile dans un contexte analytique.

Voici un dernier exemple montrant comment associer à chaque livre le nombre de ses auteurs.

books = load 'webdam-books.txt'
    as (year: int, title: chararray, author: chararray) ;
group_auth = group books by title;
authors = foreach group_auth generate group, COUNT(books.author);
dump authors;

Exercices

Reportez-vous au chapitre Pig : Travaux pratiques pour un ensemble d’exercices à faire sur machine.

Exercice: un rollup, avec Pig latin

On reprend l’énoncé de l’exercice Ex-S1-1, mais cette fois la collection Ticket contient des documents de la forme (idTicket, idClient, idProduit, date, prix), sans la catégorie donc. Il existe une autre collection Produit avec des documents de la forme (idProduit, idCatégorie), et même une collection Catégorie de la forme (idCatégorie, nom, idParent).

Comment obtenir en Pig le total des ventes à une date d, pour le niveau N2? Donnez si possible de programme Pig, ou au moins le graphe des opérateurs nécessaires.

S3: Systèmes itératifs: Spark

Les opérateurs d’un langage comme Pig restent liés à un modèle d’exécution MapReduce. Un programme Pig est compilé et exécuté comme une séquence de jobs MapReduce. Cela ne couvre pas une classe importante d’algorithmes: ceux qui procèdent par itérations sur un résultat progressivement affiné à chaque exécution. Ce type d’algorithme est très fréquent dans le domaine général de la fouille de données: PageRank, kMeans, calculs de parties connexes dans les graphes, etc.

Avec MapReduce, la spécification de l’itération reste à la charge du programmeur; il faut stocker le résultat d’un premier job dans une collection intermédiaire et réiterer le job en prenant la collection intermédiaire comme source. C’est laborieux pour l’implantation, et surtout très peu efficace quand la collection intermédiaire est grande. Le processus de sérialisation/désérialisation sur disque propre à la gestion de la reprise sur panne en MapReduce entraîne des coûts très élevés et des performances médiocres.

Des systèmes récents comme Spark proposent des méthodes spécifiquement dédiées aux algorithmes accédant de manière réitérée à même jeu de données. Dans Spark, la méthode consiste essentiellement à placer ces jeux de données en mémoire RAM et à éviter la pénalité des écritures sur le disque. Le défi est alors bien sûr de proposer une reprise sur panne automatique efficace.

Les RDDs dans Spark

La principale innovation de Spark est le concept de Resilient Distributed Dataset (RDD). Un RDD est une collection (pour en rester à notre vocabulaire) calculée à partir d’une source de données (par exemple une base de données MongoDB, un flux de données, un autre RDD) et placée en mémoire RAM. Spark conserve l’historique des opérations qui a permis de constituer un RDD, et la reprise sur panne s’appuie essentiellement sur la préservation de cet historique afin de reconstituer le RDD en cas de panne. Pour le dire brièvement: Spark n’assure pas la préservation des données en extension mais en intention. La préservation d’un programme qui tient en quelques lignes de spécification (cf. les programmes Pig) est beaucoup plus facile et efficace que la préservation du jeu de données issu de cette chaîne. C’est l’idée principale pour la résilience des RDDs.

Par ailleurs, les RDDs représentent des collections partitionnées et distribuées. Chaque RDD est donc constitué de ce que nous avons appelé fragments. Une panne affectant un fragment individuel peut donc être réparée (par reconstitution de l’historique) indépendamment des autres fragments, évitant d’avoir à tout recalculer.

Un RDD est calculé par une transformation, l’équivalent de ce nous avons appelé opérateur dans Pig. Comme dans Pig, une transformation sélectionne, enrichit, restructure une collection, ou combine deux collections. On retrouve dans Spark, à peu de choses près, les mêmes opérateurs/transformations que dans Pig, comme le montre la table ci-dessous (qui n’est pas exhaustive: reportez-vous à la documentation pour des compléments).

Opérateur Description
map Prend un document en entrée et produit un document en sortie
filter Filtre les documents de la collection
flatMap Prend un document en entrée, produit un ou plusieurs document(s) en sortie
groupByKey Regroupement de documents par une valeur de clé commune
reduceByKey Réduction d’une paire (k, [v]) par une agrégation du tableau [v]
crossProduct Produit cartésien de deux collections
join Jointure de deux collections
union Union de deux collections
cogroup Cf. la description de l’opérateur dans la section sur Pig
sort Tri d’une collection

Les RDD représentent les collections obtenues au cours des différentes étapes d’une chaîne de traitement. C’est exactement la notion que nous avons déjà étudiée avec Pig. La différence essentielle est que dans Spark, les RDD peuvent être marquées comme étant persistants car ils peuvent être réutilisés dans d’autres chaînes. Spark fait son possible pour stocker un RDD persistant en mémoire RAM, pour un maximum d’efficacité.

_images/spark-rdd.png

Fig. 16.7 RDD persistants et transitoires dans Spark.

Les RDD forment un graphe construit par application de transformations à partir de collections stockées (figure RDD persistants et transitoires dans Spark.). Un RDD peut être construit à partir d’un ou plusieurs autres RDD (2 au maximum). S’il n’est pas marqué comme persistant, le RDD sera transitoire et ne sera pas conservé en mémoire après calcul (c’est le cas des RDD 1 et 3 sur la figure). Sinon, il est conservé en mémoire.

Exemple: analyse de fichiers log

Prenons un exemple concret: dans un serveur d’application, on constate d’un module M produit des résultats incorrects de temps en temps. On veut analyser le fichier journal (log) de l’application qui contient les messages produits par le module suspect, et par beaucoup d’autres.

On construit donc un programme qui charge le log sous forme de collection, ne conserve que les messsages produits par le module M et analyse ensuite ces messages. Plusieurs analyses sont possibles en fonction des causes suspectées: la première par exemple regarde le log de M pour un produit particulier, la seconde pour un utilisateur particulier, la troisième pour une tranche horaire particulière, etc.

Avec Spark, on va créer un RDD logM persistant, contenant les messages produits par M. On construira ensuite, à partir de logM de nouveaux RDD dérivés pour les analyses spécifiques (figure Scénario d’une analyse de log avec Spark).

_images/spark-log.png

Fig. 16.8 Scénario d’une analyse de log avec Spark

On combine deux transformations pour construire logM, comme le montre le programme suivant (qui n’est pas la syntaxe exacte de Spark, que nous présenterons plus loin).

// Chargement de la collection
log = load ("app.log") as (...)
// Filtrage des messages du module M
logM = filter log with log.message.contains ("M")
// On rend logM persistant !
logM.persist();

On peut alors construire une analyse basée sur le code produit directement à partir de logM.

// Filtrage par produit
logProduit = filter logM with log.message.contains ("product P")
// .. analyse du contenu de logProduit

Et utiliser également logM pour une autre analyse, basée sur l’utilisateur.

// Filtrage par utilisateur
logUtilisateur = filter logM with log.message.contains ("utilisateur U")
// .. analyse du contenu de logProduit

Ou encore par tranche horaire.

// Filtrage par utilisateur
logPeriode = filter logM with log.date.between d1 and d2
// .. analyse du contenu de logPeriode

logM est une sorte de “vue” sur la collection initiale, dont la persistance évite de refaire le calcul complet à chaque analyse.

Reprise sur panne

Pour comprendre la reprise sur panne, il faut se pencher sur le second aspect des RDD: la distribution. Un RDD est une collection partitionnée (cf. chapitre Systèmes NoSQL: le partitionnement). La figure Partitionnement et reprise sur panne dans Spark. montre le traitement précédent dans une perspective de distribution. Chaque RDD, persistant ou non, est composé de fragments répartis dans la grappe de serveurs.

_images/spark-failover.png

Fig. 16.9 Partitionnement et reprise sur panne dans Spark.

Si une panne affecte un calcul s’appuyant sur un fragment F de RDD persistant (par exemple la transformation notée T et marquée par une croix rouge sur la figure), il suffit de le relancer à partir de F. Le gain en temps est considérable!

La panne la plus sévère affecte un fragment de RDD persistant (par exemple celui marqué par une croix violette). Dans ce cas, Spark a mémorisé la chaîne de traitement ayant constitué le RDD, et il suffit de ré-appliquer cette chaîne en remontant jusqu’aux fragments qui précèdent dans le graphe des calculs.

Dans notre cas, il faut parcourir à nouveau le fichier log pour créer le fragment logn. Si les collections stockées à l’origine du calcul sont elles-mêmes partitionnées (ce qui n’est sans doute pas le cas pour un fichier log), il suffira d’accéder à la partie de la collection à l’origine des calculs menant au RDD défaillant.

En résumé, Spark exploite la capacité à reconstruire des fragments de RDD par application de la chaîne de traitement, et ce en se limitant si possible à une partie seulement des données d’origine. La reprise peut prendre du temps, mais elle évite un recalcul complet. Si tout se passe bien (pas de panne) la présence des résultats intermédiaires en mémoire RAM assure de très bonnes performances.

Une session Spark: comptons les mots!

Voici une brève session Spark, directement inspirée de la documentation en ligne. Vous devez d’abord récupérer la dernière version stable de Spark depuis http://spark.apache.org. Installez-le dans un répertoire que nous appellerons sparkdir.

Lancez l’interpréteur de commandes.

./bin/spark-shell

Comme pour Pig, nous allons exécuter quelques commandes en local, sachant qu’il suffit d’un peu de configuration pour passer à un mode pleinement distribué. Vous pouvez récupérer le fichier http://b3d.bdpedia.fr/files/loups.txt pour faire un essai (il est temps de savoir à quoi s’en tenir à propos de ces loups et de ces moutons!), sinon n’importe quel fichier texte fait l’affaire.

Note

Comme Pig, l’interpréteur de Spark affiche de nombreux messages à la console ce qui est perturbant. Pour s’en débarasser:

  • copiez le fichier sparkdir/conf/log4j.properties.template en sparkdir/conf/log4j.properties;
  • éditez log4j.properties et remplacez dans la première ligne le niveau INFO par ERROR.

Ceci en supposant que les choses n’ont pas changé entre votre version et la mienne. Sinon, cherchez sur le web.

L’interpéteur de Spark accepte des commandes en langage Scala, dont vous êtes probablement déjà spécialiste. Si ce n’est pas le cas, copiez-collez les commandes ci-dessous.

scala> val loupsEtMoutons = sc.textFile("loups.txt")
   loupsEtMoutons: org.apache.spark.rdd.RDD[String] = loups.txt

Nous avons créé un premier RDD! Spark propose des actions directement applicable à un RDD et produisant des résultats scalaires. (Un RDD est interfacé comme un objet auquel nous pouvons appliquer des méthodes.)

scala> loupsEtMoutons.count() // Nombre de documents dans ce RDD
  res0: Long = 126

scala> loupsEtMoutons.first() // Premier document du RDD
  res1: String = Le loup est dans la bergerie.

scala> loupsEtMoutons.collect() // Récupération du RDD complet

Note

Petite astuce: en entrant le nom de l’objet (loupsEtMoutons.) suivi de la touche TAB, l’interpréteur Scala vous affiche la liste des méthodes disponibles.

Passons aux transformations. Elles prennent un (ou deux) RDD en entrée, produisent un RDD en sortie. On peut sélectionner les documents qui contiennent “bergerie”.

scala> val bergerie = loupsEtMoutons.filter(line => line.contains("bergerie"))

On peut afficher le contenu d’un RDD.

scala> bergerie.collect()
 res9: Array[String] = Array(Le loup est dans la bergerie.,
                 Les moutons sont dans la bergerie.,
                 Un loup a mangé un mouton, les autres loups sont restés dans la bergerie.)

On peut combiner une transformation et une action.

scala> loupsEtMoutons.filter(line => line.contains("loup")).count()

Et pour conclure cette petite session introductive, voici comment on implante en Spark le compteur de termes dans une collection. On crée un premier RDD constitué de tous les termes

val termes = loupsEtMoutons.flatMap(line => line.split(" "))

La méthode split décompose une chaîne de caractère (ici, en prenant comme séparateur un espace). Notez l’opérateur flatMap qui produit plusieurs documents (ici un terme) pour un document en entrée (ici une ligne).

On introduit la notion de comptage: chaque terme vaut 1. L’opérateur map produit un document en sortie pour chaque document en entrée. On peut s’en servir ici pour enrichir chaque terme avec son compteur initial.

val termeUnit = termes.map(word => (word, 1))

Rappelons qu’à chaque étape, vous pouvez afficher le contenu du RDD avec collect(). L’étape suivante regroupe les termes et effectue la somme de leurs compteurs: c’est un opérateur reduceByKey.

scala> val compteurTermes = termeUnit.reduceByKey((a, b) => a + b)

On passe à l’opérateur une fonction de réduction, ici notée littéralement dans la syntaxe Scala (si j’ai deux valeurs, j’en fait la somme). Et voilà! On aurait pu tout exprimer en une seule fois.

scala> loupsEtMoutons.flatMap(line => line.split(" "))
                       .map(word => (word, 1))
                       .reduceByKey((a, b) => a + b)

Si vous regardez le résultat, il pourra vous sembler étrange: il manque les diverses étapes de simplification du texte qui sont de mise pour un moteur de recherche. Mais l’essentiel est de comprendre l’enchaînement des opérateurs.

Finalement, si on souhaite conserver en mémoire le RDD final pour le soumettre à divers traitements, il suffit d’appeler:

scala> compteurTermes.persist()

CQFD!

Quiz

  • Quelles sont les deux fonctions principales d’un framework MapReduce ?
  • Qu’est-ce qu’un JobTracker dans Hadoop ? Combien y en a-t-il ?
  • Est-ce que les Mappers ou les Reducers peuvent communiquer entre eux ? Pourquoi ?
  • Où sont stockés les résultats des Mappers : collection distribuée ou disque local ? Pourquoi?
  • Résumer le principe de data locality.
  • Quel est le rôle de l’opération de shuffle ?
  • Comment expliqueriez-vous l’importance de traiter les paires (clé, valeur) indépendamment les unes des autres ?
  • J’implante un système MapReduce en transmettant directement les paires intermédiaires des Mappers vers les Reducers, sans écriture locale. En cas de plantage d’un des Reducers, que faut-il recalculer? Donner un exemple le plus simple possible.
  • Expliquez brièvement ce que doit faire le JobMaster quand un Reducer se plante.
  • Reprenez l’exemple de notre session illustrative de Pig. Comme exécuter cette chaîne de traitement avec MapReduce? Donnez le détail de ce qui se passe dans la phase de Map et la phase de Reduce.
  • Quand commence l’exécution du Reduce ? et pourquoi ?