Calcul distribué: Hadoop et MapReduce

Nous abordons maintenant le domaine des traitements analytiques à grande échelle qui, contrairement à des fonctions de recherche qui s’intéressent à un document précis ou à un petit sous-ensemble d’une collection, parcourent l’intégralité d’un large ensemble pour en extraire des informations et construire des modèles statistiques ou analytiques.

La préoccupation essentielle n’est pas ici la performance (de toute façon, les traitements durent longtemps) mais la garantie de scalabilité horizontale qui permet malgré tout d’obtenir des temps de réponse raisonnables, et surtout la garantie de terminaison en dépit des pannes pouvant affecter le système pendant le traitement.

_images/calculdistr.png

Fig. 93 Le calcul distribué, compagnon logique du stockage distribué

La Fig. 93 montre, en vert, le positionnement logique du calcul distribué par rapport aux systèmes de stockage distribué étudiés jusqu’ici. La répartition des données ouvre logiquement la voie à la distribution des traitements sur les données. L’un ne va pas sans l’autre: il serait peu utile d’appliquer un calcul distribué sur une source de données centralisée qui constituerait le goulot d’étranglement, et réciproquement.

Ce chapitre va étudier les méthodes qui permettent de distribuer des calculs à très grande échelle sur des systèmes de stockage partitionnés et distribués. Tous les systèmes vus jusqu’à présent sont des candidats valables pour alimenter des calculs distribués, mais nous allons regarder cette fois HDFS, un système de fichiers étroitement associé à Hadoop.

Pour les calculs eux-mêmes, deux possibilités sont offertes: des opérateurs intégrés à un langage de programmation, dont MapReduce est l’exemple de base, ou des langages de workflow (ou à la SQL) qui permettent des spécifications de plus haut niveau. Nous étudierons Pig latin, un des premiers représentants du genre.

Ce chapitre ne considère pas l’algorithmique analytique proprement dite, mais les opérateurs de manipulation de données qui fournissent l’information à ces algorithmes. En clair, il s’agit de voir comment récupérer des sources de données, comment les filtrer, les réorganiser, les combiner, les enrichir, le tout en respectant les deux contraintes fondamentales de la scalabilité: parallélisation et tolérance aux pannes.

La notion d’opérateur de second ordre

Les opérateurs décrits ici sont des opérateurs de second ordre. Contrairement aux opérateurs classiques qui s’appliquent directement à des données, un opérateur de second ordre prend des fonctions en paramètres et applique ces fonctions à des données au cours d’un traitement immuable (par exemple un parcours séquentiel).

Depuis 2004, le modèle phare d’exécution est MapReduce, déjà introduit dans le chapitre Interrogation de bases NoSQL dans un contexte centralisé, et distribué en Open Source dans le système Hadoop. MapReduce est le premier modèle à combiner distribution massive et reprise sur panne dans le contexte d’un cloud de serveurs à bas coûts. Ses limites sont cependant évidentes: faible expressivité (très peu d’opérateurs) et performances médiocres.

Très rapidement, des langages de plus haut niveau (Pig, Hive) ont été proposés, avec pour objectif notable l’expression d’opérateurs plus puissants (par exemple les jointures). Ces opérateurs restent exécutables dans un contexte MapReduce, un peu comme SQL est exécutable dans un système basé sur des parcours de fichier. Enfin, récemment, des systèmes proposant des alternatives plus riches à Hadoop ont commencé à émerger. La motivation essentielle est de fournir un support aux algorithmes fonctionnant par itération. C’est le cas d’un grand nombre de techniques en fouilles de données qui affinent progressivement un résultat jusqu’à obtenir une solution optimale. MapReduce est (était) très mal adapté à ce type d’exécution. Les systèmes comme Spark ou Flink constituent de ce point de vue un progrès majeur.

Ce chapitre suit globalement cette organisation historique, en commençant par MapReduce, suivi du système HDFS/Hadoop, et finalement d’une présentation du langage Pig. Les systèmes itératifs feront l’objet des chapitres suivants.

S1: MapReduce

Reportez-vous au chapitre Interrogation de bases NoSQL pour une présentation du modèle MapReduce d’exécution. Rappelons que MapReduce n’est pas un langage d’interrogation de données, mais un modèle d’exécution de chaînes de traitement dans lesquelles des données (massives) sont progressivement transformées et enrichies.

Pour être concrets, nous allons prendre l’exemple (classique) d’un traitement s’appliquant à une collection de documents textuels et déterminant la fréquence des termes dans les documents (indicateur TF, cf. Principes du classement de documents). Pour chaque terme présent dans la collection, on doit donc obtenir le nombre d’occurrences.

Le principe de localité des données

Dans une approche classique de traitement de données stockées dans une base, on utilise une architecture client-serveur dans laquelle l’application cliente reçoit les données du serveur. Cette méthode est difficilement applicable en présence d’un très gros volume de données, et ce d’autant moins que les collections sont stockées dans un système distribué. En effet:

  • le transfert par le réseau d’une large collection devient très pénalisant à grande échelle (disons, le TéraOctet);

  • et surtout, la distribution des données est une opportunité pour effectuer les calculs en parallèle sur chaque machine de stockage, opportunité perdue si l’application cliente fait tout le calcul.

Ces deux arguments se résument dans un principe dit de localité des données (data locality). Il peut s’énoncer ainsi: les meilleures performances sont obtenues quand chaque fragment de la collection est traité localement, minimisant les besoins d’échanges réseaux entre les machines.

Note

Reportez-vous au chapitre Le cloud, une nouvelle machine de calcul pour une analyse quantitative montrant l’intérêt de ce principe.

L’application du principe de localité des données mène à une architecture dans laquelle, contrairement au client-serveur, les données ne sont pas transférées au programme client, mais le programme distribué à toutes les machines stockant des données (Fig. 94).

_images/data-locality.png

Fig. 94 Principe de localité des données, par transfert des programmes

En revanche, demander à un développeur d’écrire une application distribuée basée sur ce principe constitue un défi technique de grande ampleur. Il faut en effet concevoir simultanément les tâches suivantes:

  • implanter la logique de l’application, autrement dit le traitement particulier qui peut être plus ou moins complexe;

  • concevoir la parallélisation de cette application, sous la forme d’une exécution concurrente coordonnant plusieurs machines et assurant un accès à un partitionnement de la collection traitée;

  • et bien entendu, gérer la reprise sur panne dans un environnement qui, nous l’avons vu, est instable.

Un framework d’exécution distribuée comme MapReduce est justement dédié à la prise en charge des deux derniers aspects, spécifiques à la distribution dans un cloud, et ce de manière générique. Le framework définit un processus immuable d’accès et de traitement, et le programmeur implante la logique de l’application sous la forme de briques logicielles confiées au framework et appliquées par ce dernier dans le cadre du processus.

Avec MapReduce, le processus se déroule en deux phases, et les « briques logicielles » consistent en deux fonctions fournies par le développeur. La phase de Map traite chaque document individuellement et applique une fonction map() dont voici le pseudo-code pour notre application de calcul du TF.

function mapTF($id, $contenu)
{
  // $id: identifiant du document
  // $contenu: contenu textuel du document

  // On boucle sur tous les termes du contenu
  foreach  ($t in $contenu) {
    // Comptons les occurrences du terme dans le contenu
    $count = nbOcc ($t, $contenu);
    // On "émet" le terme et son nombre des occurrences
    emit ($t, $count);
  }
}

La phase de Reduce reçoit des valeurs groupées sur la clé et applique une agrégation de ces valeurs. Voici le pseudo-code pour notre application TF.

function reduceTF($t, $compteurs)
{
  // $t: un terme
  // $compteurs: la séquence des décomptes effectués localement par le Map
  $total = 0;

  // Boucles sur les compteurs et calcul du total
  foreach ($c in $compteurs) {
   $total = $total + $c;
  }

  // Et on produit le résultat
  return $total;
}

Dans ce cadre restreint, le framework prend en charge la distribution et la reprise sur panne.

Important

Ce processus en deux phases et très limité et ne permet pas d’exprimer des algorithmes complexes, ceux basés par exemple sur une itération menant progressivement au résultat. C’est l’objectif essentiel de modèles d’exécution plus puissants que nous présentons ultérieurement.

Exécution distribuée d’un traitement MapReduce

La Fig. 95 résume l’exécution d’un traitement (« job ») MapReduce avec un framework comme Hadoop. Le système d’exécution distribué fonctionne sur une architecture maître-esclave dans laquelle le maître (JobTracker dans Hadoop) se charge de recevoir la requête de l’application, la distribue sous forme de tâche à des nœuds (TaskTracker dans Hadoop) accédant aux fragments de la collection, et coordonne finalement le déroulement de l’exécution. Cette coordination inclut notamment la gestion des pannes.

_images/mr-execution.png

Fig. 95 Exécution distribuée d’un traitement MapReduce

L’application cliente se connecte au maître, transmet les fonctions de Map et de Reduce, et soumet la demande d’exécution. Le client est alors libéré, en attente de la confirmation par le maître que le traitement est terminé (cela peut prendre des jours …). Le framework fournit des outils pour surveiller le progrès de l’exécution pendant son déroulement.

Le traitement s’applique à une source de données partitionnée. Cette source peut être un simple système de fichiers distribués, un système relationnel, un système NoSQL type MongoDB ou HBase, voire même un moteur de recherche comme Solr ou ElasticSearch.

Le Maître dispose de l’information sur le partitionnement des données (l’équivalent du contenu de la table de routage, présenté dans le chapitre sur le partitionnement) ou la récupère du serveur de données. Un nombre M de serveurs stockant tous les fragments concernés est alors impliqué dans le traitement. Idéalement, ces serveurs vont être chargés eux-mêmes du calcul pour respecter le principe de localité des données mentionné ci-dessus. Un système comme Hadoop fait de son mieux pour respecter ce principe.

La fonction de Map est transmise aux M serveurs et une tâche dite Mapper applique la fonction à un fragment. Si le serveur contient plusieurs fragments (ce qui est le cas normal) il faudra exécuter autant de tâches. Si le serveur est multi-cœurs, plusieurs fragments peuvent être traités en parallèle sur la même machine.

Exemple: le partitionnement des données pour l’application TF

Supposons par exemple que notre collection contienne 1 milliard de documents dont la taille moyenne est de 1000 octets. On découpe la collection en fragments de 64 MOs. Chaque fragment contient donc 64 000 documents. Il y a donc à peu près \(\lceil 10^9/64,000 \rceil \approx 16,000\) fragments. Si on dispose de 16 machines, chacune devra traiter (en moyenne) 1000 fragments et donc exécuter mille tâches de Mapper.

Le parallélisme peut alors être interne à une machine, en fonction du nombre de cores dont elle dispose. Une machine 4 cores pourra ainsi effectuer 4 tâches en parallèle en théorie.

Chaque mapper travaille, dans la mesure du possible, localement: le fragment est lu sur le disque local, document par document, et l’application de la fonction de Map « émet » des paires (clé, valeur) dites « intermédiaires » qui sont stockées sur le disque local. Il n’y a donc aucun échange réseau pendant la phase de Map (dans le cas idéal où la localité des données peut être complètement respectée).

Exemple: la phase de Map pour l’application TF

Supposons que chaque document contienne en moyenne 100 termes distincts. Chaque fragment contient 64 000 documents. Un Mapper va donc produire 6 400 000 paires (t, c)t est un terme et c le nombre d’occurrences.

À l’issue de la phase de Map, le maître initialise la phase de Reduce en choisissant R machines disponibles. Il faut alors distribuer les paires intermédiaires à ces R machines. C’est une phase « cachée », dite de shuffle, qui constitue potentiellement le goulot d’étranglement de l’ensemble du processus car elle implique la lecture sur les disques des Mappers de toutes les paires intermédiaires, et leur envoi par réseau aux machines des Reducers.

Important

Vous noterez peut-être qu’une solution beaucoup plus efficace serait de transférer immédiatement par le réseau les paires intermédiaires des Mappers vers les Reducers. Il y a une explication à ce choix en apparence sous-optimal: c’est la reprise sur panne (voir plus loin).

Pour chaque paire intermédiaire, un simple algorithme de hachage permet de distribuer les clés équitablement sur les R machines chargées du Reduce.

Au niveau d’un Reducer \(R_i\), que se passe-t-il?

  • Tout d’abord il faut récupérer toutes les paires intermédiaires produites par les Mappers et affectées à \(R_i\).

  • Il faut ensuite trier ces paires sur la clé pour regrouper les paires partageant la même clé. On obtient des paires (k, [v])k est une clé, et [v] la liste des valeurs reçues par le Reducer pour cette clé.

  • Enfin, chacune des paires (k, [v]) est soumise à la fonction de Reduce.

Exemple: la phase de Reduce pour l’application TF

Supposons R=10. Chaque Reducer recevra donc en moyenne 640 000 paires (t, c) de chaque Mapper. Ces paires sont triées sur le terme t. Pour chaque terme on a donc la liste des nombres d’occurences trouvés dans chaque document par les Mappers. Au pire, si un terme est présent dans chaque document, le tableau [v] contient un million d’entiers.

Il reste, avec la fonction de Reduce, à faire le total de ces nombres d’occurences pour chaque terme.

Exemple: comptons les loups et le moutons

Vous souvenez-vous de ces quelques documents?

  • A: Le loup est dans la bergerie.

  • B: Les moutons sont dans la bergerie.

  • C: Un loup a mangé un mouton, les autres loups sont restés dans la bergerie.

  • D: Il y a trois moutons dans le pré, et un mouton dans la gueule du loup.

Ils sont maintenant stockés dans un système partitionné sur 3 serveurs comme montré sur la Fig. 96. Nous appliquons notre traitement TF pour compter le nombre total d’occurrences de chaque terme (on va s’intéresser aux termes principaux).

_images/mr-execution-ex.png

Fig. 96 Un exemple minuscule mais concret

Nous avons trois Mappers qui produisent les données intermédiaires présentées sur la figure. Comprenez-vous pourquoi le terme bergerie apparaît deux fois pour le premier Mapper par exemple?

La phase de Reduce, avec 2 Reducers, n’est illustrée que pour le terme loup donc on suppose qu’il est affecté au premier Reducer. Chaque Mapper transmet donc ses paires intermédiaires (loup, …) à R1 qui se charge de regrouper et d’appliquer la fonction de Reduce.

Quand tous les Reducers ont terminé, le résultat est disponible sur leur disque local. Le client peut alors le récupérer.

La reprise sur panne

Comment assurer la gestion des pannes pour une exécution MapReduce? Dans la mesure où elle peut consister en centaines de tâches individuelles, il est inenvisageable de reprendre l’ensemble de l’exécution si l’une de ces tâches échoue, que ce soit en phase de Map ou en phase de Reduce. Le temps de tout recommencer, une nouvelle panne surviendrait, et le job ne finirait jamais.

Le modèle MapReduce a été conçu dès l’origine pour que la reprise sur panne puisse être gérée au niveau de chaque tâche individuelle, et que la coordination de l’ensemble soit également résiliente aux problèmes de machine ou de réseau.

Le Maître délègue les tâches aux machines et surveille la progression de l’exécution. Si une tâche semble interrompue, le Maître initie une action de reprise qui dépend de la phase.

Panne en phase de Reduce

Si la machine reste accessible et que la panne se résume à un échec du processus, ce dernier peut être relancé sur la même machine, et si possible sur les données locales déjà transférées par le shuffle. C’est le cas le plus favorable.

Dans un cas plus grave, avec perte des données par exemple, une reprise plus radicale consiste à choisir une autre machine, et à relancer la tâche en réinitialisant le transfert des paires intermédiaires depuis les machines chargées du Map. C’est possible car ces paires ont été écrites sur les disques locaux et restent donc disponibles. C’est une caractéristique très importante de l’exécution MapReduce: l’écriture complète des fragments intermédiaires garantit la possibilité de reprise en cas de panne.

Une méthode beaucoup plus efficace mais beaucoup moins robuste consisterait à ce que chaque mapper transfère immédiatement les paires intermédiaires, sans écriture sur le disque local, vers la machine chargée du Reduce. Mais en cas de panne de ce dernier, ces paires intermédiaires risqueraient de disparaître et on ne saurait plus effectuer la reprise sur panne (sauf à ré-exécuter l’ensemble du processus).

Cette caractéristique explique également la lenteur (déspérante) d’une exécution MapReduce, due en grande partie à la nécessité d’effectuer des écritures et lectures répétées sur disque, à chaque phase.

Panne en phase Map

En cas de panne pendant l’exécution d’une tâche de Map, on peut soit reprendre la tâche sur la même machine si c’est le processus qui a échoué, soit transférer la tâche à une autre machine. On tire ici parti de la réplication toujours présente dans les systèmes distribués: quel que soit le fragment stocké sur une machine, il existe un réplica de ce fragment sur une autre, et à partir de ce réplica une tâche équivalente peut être lancée.

Le cas le plus pénalisant est la panne d’une machine pendant la phase de transfert vers les Reducers. Il faut alors reprendre toutes les tâches initialement allouées à la machine, en utilisant la réplication.

Et le maître?

Finalement, il reste à considérer le cas du Maître qui est un point individuel d’échec: en cas de panne, il faut tout recommencer.

L’argument des frameworks comme Hadoop est qu’il existe un Maître pour des dizaines de travailleurs, et qu’il est peu probable qu’une panne affecte directement le serveur hébergeant le nœud-Maître. Si cela arrive, on peut accepter de reprendre l’ensemble de l’exécution, ou prendre des mesures préventives en dupliquant toutes les données du Maître sur un nœud de secours.

Quiz

Qu’est-ce qu’un opérateur de second ordre?

  1. C’est un opérateur qui est appliqué à des documents par d’autres opérateurs

  2. C’est un opérateur qui applique d’autres opérateurs à des documents

  3. C’est un opérateur qui résulte de la composition d’opérateurs primaires

Quel est le rôle de l’opération de shuffle ?

  1. Distribuer les résultats des mappers vers les reducers

  2. Distribuer les résultats des reducers vers les mappers

  3. Distribuer les documents équitablement sur les mappers

Où sont stockés les résultats des Mappers ?

  1. Dans un système de stockage distribué pour permettre la reprise sur panne

  2. Localement, pour favoriser la performance

  3. Ils sont transmis aux reducers au fur et à mesure

Que dit le principe de data locality.

  1. Que les données doivent rester à l’intérieur d’un même système distribué

  2. Que les données doivent être traitées là où elles sont stockées

  3. Que les données peuvent être échangées entre mappers ou entre reducers, mais pas entre mappers et reducers

J’implante un système MapReduce en transmettant directement les paires intermédiaires des Mappers vers les Reducers, sans écriture locale. En cas de plantage d’un des Reducers avec perte locale des données, que faut-il recalculer?

  1. Tout!

  2. On ne relance que la fraction des mappers qui a alimenté le reducer planté (soit 1/R des mappers, R étant le nombre de reducers)

  3. Il suffit de ré-executer le reducer

Quand commence l’exécution de la phase de Reduce ?

  1. Dès que les mappers commencent à transmettre des paires intermédiaires

  2. Dès que l’un des mappers a complètement terminé sa tâche

  3. Dès que tous les mappers ont terminé leur tâche

Pourquoi ne pas exécuter la fonction de Reduce sur les paires intermédiaires stockées sur les disques des machines de Map?

  1. Parce qu’elles ne sont pas triées et qu’il faut donc faire appel à une autre machine dédiée au tri

  2. Parce qu’une machine de Map ne détient qu’une partie des paires d’un même groupe

  3. Parce qu’une même paire peut être impliquée dans plusieurs groupes, d’où la phase de Shuffle

Quand peut-on effacer les paires intermédiaires stockées sur les mappers

  1. Dès qu’elles ont été transmises aux reducers

  2. À la toute fin du processus, quand le dernier reducer a terminé.

  3. Dès que tous les reducers ont acquitté la réception des paires qu’ils doivent traiter

S2: Une brève introduction à Hadoop

Supports complémentaires

Cette session propose une introduction à l’environnement historique de programmation distribuée à grande échelle, Hadoop. Pour être tout à fait exact, Hadoop est une implantation en open source de l’architecture présentée par Google au début des années 2000, et comprenant essentiellement un système de fichiers distribué et tolérant aux pannes, GFS, et le modèle MapReduce qui s’appuie sur GFS pour effectuer un accès parallélisé à de très gros volumes de données. Un troisième composant « Google », BigTable (HBase dans la version Hadoop), propose une organisation plus structurée des données que de simples fichiers. Il n’est pas présenté ici.

Notre objectif dans cette session est de comprendre HDFS, d’y charger des données, puis de leur appliquer un traitement MapReduce. Les aspects architecturaux, brievement évoqués, devraient maintenant être clairs pour vous puisqu’ils s’appuient sur des principes standards déjà exposés.

Important

Cette session propose du code MapReduce qui a été testé et devrait fonctionner, mais l’expérience montre que la mise en œuvre de Hadoop est laborieuse et dépend de paramètres qui changent souvent: sauf si vous êtes très motivés, il est préférable sans doute de ne pas perdre de temps à chercher à reproduire les commandes qui suivent. Concentrez-vous sur les principes.

Systèmes de fichiers distribués

HDFS est donc la version open source du Google File System, dont le but est de fournir un environnement de stockage distribué et tolérant aux pannes pour de très gros fichiers. HDFS peut être utilisé directement comme service d’accès à ces fichiers, ou indirectement par des systèmes de gestion de données (HBase pas exemple) qui obtiennent ainsi la distribution et la résistance aux pannes sans avoir à les implanter directement.

Un système de fichiers comme HDFS est conçu pour la gestion de fichiers de grande taille (plusieurs dizaines de MOs au minimum), que l’on écrit une fois et qu’on lit ensuite par des parcours séquentiels. Le contre-exemple est celui d’une collection de très petits documents souvent modifiés: il vaut mieux dans ce cas utiliser un système NoSQL documentaire spécialisé.

Pour comprendre cette distinction, étudions les deux scénarii illustrés par la Fig. 97. Sur la partie gauche, nous trouvons un système de fichiers distribués classique, de type NFS (Network File System: consultez la fiche Wikipedia pour en savoir - un peu - plus). Dans ce type d’organisation, le serveur 1 dispose d’un système de fichiers organisé de manière hiérachique, très classiquement. La racine (/) donne accès aux répertoires dirA et dirB, ce dernier contenant un fichier fichier2, le tout étant stocké sur le dique local.

_images/dfs-bigpic.png

Fig. 97 Deux types de systèmes de fichiers distribués

Imaginons que le serveur 1 souhaite pouvoir accéder au répertoire dirC et fichier fichier1 qui se trouvent sur le serveur 2. Au lieu de se connecter à distance explicitement à chaque fois, on peut « monter » (mount) dirC dans le système de fichier du serveur 1, sous la forme d’un répertoire-fils de dirB. Du point de vue de l’utilisateur, l’accès devient complètement transparent. On peut accéder à /dirA/dirB/dirC comme s’il s’agissait d’un répertoire local. L’appel réseau qui maintient dirC dans l’espace de nommage du serveur 1 est complètement géré par la couche NFS (ou toute autre solution équivalente).

Dans un contexte « Big Data », avec de très gros volumes de données, cette solution n’est cependant pas satisfaisante. En particulier, ni l’équilibrage (load balancing) ni le principe de localité ne sont pris en satisfaits. Premièrement, si 10% des données sont stockées dans le fichier 1 et 90% dans le fichier 2, le serveur 2 devra subir 90% des accès (en supposant une répartition uniforme des requêtes). Ensuite, un processus s’exécutant sur le serveur 1 peut être amené à traiter un fichier du serveur 2 sans se rendre compte qu’il engendre de très gros accès réseaux.

La partie droite de la Fig. 97 montre l’approche GFS/HDFS qui est totalement dédiée aux très gros fichiers et aux accès distribués. La grande différence est que la notion de fichier ne correspond plus à un stockage physique localisé, mais devient un symbole désignant un stockage partitionné, distribué et répliqué. Chaque fichier est divisé en fragments (3 fragments pour le fichier 2 par exemple), de tailles égales, et ces fragments sont alloués par HDFS aux serveurs du cluster. Chaque fragment est de plus répliqué.

Le système de fichier de vient alors un espace de noms virtuel, partagé par l’ensemble des nœuds, et géré par un nœud spécial, le maître. On retrouve, pour la notion classique de fichier, les principes généraux déjà étudié dans ce cours.

Il est facile de voir que les inconvénients précédents (défaut d’équilibrage et de localité des données) sont évités. Il est également facile de constater que cette approche n’est valable que pour de très gros fichiers qu’il est possible de partitionner en fragments de taille significative (quelques dizaines de MOs typiquement).

Architecture HDFS

Voici maintenant un aperçu de l’architecture de GFS (Fig. 98). Le système fonctionne en mode maître/esclave, le maître (namenode) jouant comme d’habitude le rôle de coordinateur et les esclaves (datanode) assurant le stockage. Le maître maintient (en mémoire RAM) l’image globale du système de fichiers, sous la forme d’une arborescence de répertoires et de fichiers. À chaque fichier est associée une table décrivant le partionnement de son contenu en fragment, et la répartition de ces fragments sur les différents nœuds-esclaves.

_images/gfs.png

Fig. 98 Architecture HDFS

Les applications clients doivent se connecter au maître auquel elles transmettent leur requête sous la forme d’un chemin d’accès à un fichier, par exemple, comme illustré sur la figure, le chemin /A/B/f1. Voici en détail le cheminement de cette requête:

  • Elle est d’abord routée par le client (qui ignore tout de l’organisation du stockage) vers le maître.

  • Le maître inspecte sa hiérarchie, et trouve les adresses des fragments constituant le fichier.

  • Chaque serveur stockant un fragment est alors mis directement en contact avec le client qui peut récupérer tout ou partie du fichier.

En d’autres termes, les échanges avec le maître sont limités aux méta-données décrivant le fichier et sa répartition, ce qui évite les inconvénients d’avoir à s’adresser systématiquement à un même nœud lors de l’initialisation d’une requête. Toutes les autres commandes de type POSIX (écriture, déplacement, droits d’acc¡es, etc.) suivent le même processus.

Encore une fois la conception de HDFS est très orientée vers le stockage de fichiers de très grande taille (des GOs, voire des TOs). Ces fichiers sont partitionnés en fragments de 64 MOs, ce qui permet de les lire en parallèle. La lecture par une seule application cliente, comme illustré sur la Fig. 98, constituerait un goulot d’étranglement, mais cette architecture prend tout son sens dans le cas de traitement MapReduce, le contenu d’un fichier pouvant alors être lu en parallèle par tous les serveurs d’une grappe.

Utiliser HDFS pour de très nombreux petits fichiers serait un contresens: la mémoire RAM du maître pourrait être insuffisante pour stocker l’ensemble du namespace, et on perdrait toute possibilité de parallélisation.

HDFS fournit un mécanisme natif de tolérance aux pannes qui le rend avantageux pour des système de gestion de données qui veulent déléguer la distribution et la fiabilité du stockage. Ce mécanisme s’appuie tout d’abord sur la réplication d’un même fragment (3 exemplaires par défaut) sur différents serveurs.

Le maître assure la surveillance des esclaves par des communications (heartbeats) fréquents (toutes les secondes) et réorganise la communication entre une application cliente et le fragment qu’elle est en train de lire en cas de défaillance du serveur. Ce remplacement est utile par exemple, comme nous l’avons vu, pour un traitement MapReduce afin d’effectuer à nouveau un calcul sur l’un des fragments.

Enfin le maître lui-même est un des points sensibles du système: en cas de panne plus rien ne marcherait et des données seraient perdues. On peut mettre en place un « maître fantôme » prêt à prendre le relais, et une journalisation de toutes les écritures pour pouvoir effectuer une reprise sur panne.

Mise en œuvre avec Hadoop

Voici maintenant une présentation concise de la mise en œuvre d’un système HDFS L’environnement est assez lourd à mettre en place et à configurer donc nous allons aller au plus simple dans ce qui suit.

Des images Docker existent pour Hadoop mais elles ne me semblent pas plus simples à gérer qu’une installation directe, avec les options simplifiées proposées par Hadoop.

Important

Encore une fois, l’expérience montre que la lourdeur de Hadoop s’accomode mal d’un déploiement virtuel sur une seule petite machine. L’importance du sujet ne justifie pas que vous y passiez des jours en vous arrachant les cheveux. Il suffit sans doute de lire une fois cette session pour comprendre l’essentiel.

Si vous tentez quand même la mise en pratique, sachez que les commandes qui suivent supposent un environnement de type Unix (MacOS X en fait partie). Pour Windows, je ne peux que vous renvoyer au site de Hadoop, en espérant pour vous que ce ne soit pas trop compliqué.

Autre avertissement: Hadoop, c’est du Java, donc il faut au minimum savoir compiler et exécuter un programme java, et disposer d’une mémoire RAM volumineuse.

Si l’avertissement qui précède vous effraie (c’est fait pour), il vaut sans doute mieux se contenter d’une simple lecture de cette partie.

Installation et configuration

Je vous invite donc à récupérer la dernière version (binaire, inutile de prendre les sources) sur le site http://hadoop.apache.org. C’est un fichier dont le nom ressemble à hadoop-2.7.3.tar.gz. Décompressez-le quelque part, par exemple dans \tmp. les commandes devraient ressembler à (en utilisant bien sûr le nom du fichier récupéré):

mv hadoop-2.7.3.tar.gz /tmp
cd /tmp
tar xvfz hadoop-2.7.3.tar.gz

Bien, vous devez alors définir une variable d’environnement HADOOP_HOME qui indique le répertoire d’installation de Hadoop.

export HADOOP_HOME=/tmp/hadoop-2.7.3

Les répertoires bin et sbin de Hadoop contiennent des exécutables. Pour les lancer sans avoir à se placer dans l’un de ces répertoires, ajoutez-les dans votre variable PATH.

export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

Bien, vous devriez alors pouvoir exécuter un programme Hadoop. Par exemple:

hadoop version
Hadoop 2.7.3

Pour commencer il faut configurer Hadoop pour qu’il s’exécute en mode dit « pseudo-distribué », ce qui évite la configuration complexe d’un véritable cluster. Vous devez éditer le fichier $HADOOP_HOME/etc/hadoop/core-site.xml et indiquer le contenu suivant :

<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>
 </configuration>

Cela indique à Hadoop que le nœud maître HDFS (le « NameNode » dans la terminologie Hadoop) est en écoute sur le port 9000.

Pour limiter la réplication, modifiez également le fichier $HADOOP_HOME/etc/hadoop/hdfs-site.xml. Son contenu doit être le suivant:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

Premières manipulations

Ouf, la configuration minimale est faite, nous sommes prêts à effectuer nos premières manipulations. Tout d’abord nous allons formatter l’espace dédié au stockage des données.

hdfs namenode -format

Une fois ce répertoire formatté nous lançons le maître HDFS (le namenode). Ce maître gère la hiérarchie (virtuelle) des répertoires HDFS, et communique avec les datanodes, les « esclaves » dans la terminologie employée jusqu’ici, qui sont chargés de gérer les fichiers (ou fragments de fichiers) sur leurs serveurs respectifs. Dans notre cas, la configuration ci-dessus va lancer un namenode et deux datanodes, grâce à la commande suivante:

start-dfs.sh &

Note

Les nœuds communiquent entre eux par SSH, et il faut éviter que le mot de passe soit demandé à chaque fois. Voici les commandes pour permettre une connection SSH sans mot de passe.

ssh-keygen -t rsa -P ""
cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

Vous devriez obtenir les messages suivants:

starting namenode, logging to (...)
localhost: starting datanode, logging to (...)
localhost: starting secondarynamenode, logging to (...)

Le second namenode est un miroir du premier. À ce stade, vous disposez d’un serveur HDFS en ordre de marche. Vous pouvez consulter son statut et toutes sortes d’informations grâce au serveur web accessible à http://localhost:50070. La figure Fig. 99 montre l’interface

_images/hdfs-ui.png

Fig. 99 Perspective générale sur les systèmes distribués dans un cloud

Bien entendu, ce système de fichier est vide. Vous pouvez y charger un premier fichier, à récupérer sur le site à l’adresse suivante: http://b3d.bdpedia.fr/files/author-medium.txt. Il s’agit d’une liste de publications sur laquelle nous allons faire tourner nos exemples.

Pour interagir avec le serveur de fichier HDFS, on utilise la commande hadoop fs <commande> où commande est la commande à effectuer. La commande suivante crée un répertoire /dblp dans HDFS.

hadoop fs -mkdir /dblp

Puis on copie le fichier du système de fichiers local vers HDFS.

hadoop fs -put author-medium.txt /dblp/author-medium.txt

Finalement, on peut constater qu’il est bien là.

hadoop fs -ls /dblp

Note

Vous trouverez facilement sur le web des commandes supplémentaires, par exemple ici: https://dzone.com/articles/top-10-hadoop-shell-commands

Pour inspecter le système de fichiers avec l’interface Web, vous pouvez aussi accéder à http://localhost:50070/explorer.html#/

Que sommes-nous en train de faire? Nous copions un fichier depuis notre machine locale vers un système distribué sur plusieurs serveurs. Si le fichier est assez gros, il est découpé en fragments et réparti sur différents serveurs. Le découpage et la recomposition sont transparents et entièrement gérés par Hadoop.

Nous avons donc réparti nos données (si du moins elles avaient une taille respectable) dans le cluster HDFS. Nous sommes donc en mesure maintenant d’effectuer un calcul réparti avec MapReduce.

MapReduce, le calcul distribué avec Hadoop

L’exemple que nous allons maintenant étudier est un processus MapReduce qui accède au fichier HDFS et effectue un calcul assez trivial. Ce sera à vous d’aller plus loin ensuite.

Installation et configuration

Depuis la version 2 de Hadoop, les traitements sont gérés par un gestionnaire de ressources distribuées nommé Yarn. Il fonctionne en mode maître/esclaves, le maître étant nommé Resourcemanager et les esclaves NodeManager.

Un peu de configuration préalable s’impose avant de lancer notre cluster Yarn. Editez tout d’abord le fichier $HADOOP_HOME/etc/hadoop/mapred-site.xml avec le contenu suivant:

<configuration>
    <property>
       <name>mapreduce.framework.name</name>
       <value>yarn</value>
    </property>
</configuration>

Ainsi que le fichier $HADOOP_HOME/etc/hadoop/yarn-site.xml:

<configuration>
    <property>
       <name>yarn.nodemanager.aux-services</name>
       <value>mapreduce_shuffle</value>
    </property>
 </configuration>

Vous pouvez alors lancer un cluster Yarn (en plus du cluster HDFS).

start-yarn.sh

Yarn propose une interface Web à l’adresse http://localhost:8088/cluster: Elle montre les applications en cours ou déjà exécutées.

Notre programme MapReduce

Important

Toutes nos compilations java font se fait par l’intermédiaire du script hadoop. Il suffit de définir la variable suivante au préalable:

export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

Le format du fichier que nous avons placé dans HDFS est très simple: il contient des noms d’auteur et des titres de publications, séparés par des tabulations. Nous allons compter le nombre de publications de chaque auteur dans notre fichier.

Notre première classe Java contient le code de la fonction de Map.

/**
 * Les imports indispensables
 */

import java.io.IOException;
import java.util.Scanner;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Exemple d'une fonction de map: on prend un fichier texte contenant
 * des auteurs et on extrait le nom
 */
public class AuthorsMapper extends
  Mapper<Object, Text, Text, IntWritable> {

  private final static IntWritable one = new IntWritable(1);
  private Text author = new Text();

   /* la fonction de Map */
    @Override
    public void map(Object key, Text value, Context context)
       throws IOException, InterruptedException {

      /* Utilitaire java pour scanner une ligne  */
      Scanner line = new Scanner(value.toString());
      line.useDelimiter("\t");
      author.set(line.next());
      context.write(author, one);
    }
  }

Hadoop fournit deux classes abstraites pour implanter des fonctions de Map et de Reduce: Mapper et Reducer. Il faut étendre ces classes et implanter deux méthodes, respectivement map() et reduce().

L’exemple ci-dessus montre l’implantation de la fonction de map. Les paramètres de la classe abstraite décrivent respectivement les types des paires clé/valeur en entrée et en sortie. Ces types sont fournis pas Hadoop qui doit savoir les sérialiser pendant les calculs pour les placer sur disque. Finalement, la classe Context est utilise pour pouvoir interagir avec l’environnement d’exécution.

Notre fonction de Map prend donc en entrée une paire clé/valeur constituée du numéro de ligne du fichier en entrée (automatiquement engendrée par le système) et de la ligne elle-même. Notre code se contente d’extraire la partie de la ligne qui précède la première tabulation, en considérant que c’est le nom de l’auteur. On produit dont une paire intermédiaire (auteur, 1).

La fonction de Reduce est encore plus simple. On obtient en entrée le nom de l’auteur et une liste de 1, aussi longue qu’on a trouvé d’auteurs dans les fichiers traités. On fait la somme de ces 1.

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * La fonction de Reduce: obtient des paires  (auteur, <publications>)
 * et effectue le compte des publications
 */
public  class AuthorsReducer extends
      Reducer<Text, IntWritable, Text, IntWritable> {
   private IntWritable result = new IntWritable();

   @Override
   public void reduce(Text key, Iterable<IntWritable> values,
        Context context)
    throws IOException, InterruptedException {

    int count = 0;
    for (IntWritable val : values) {
      count += val.get();
    }
    result.set(count);
    context.write(key, result);
  }
}

Nous pouvons maintenant soumettre un « job » avec le code qui suit. Les commentaires indiquent les principales phases. Notez qu’on lui indique les classes implantant les fonctions de Map et de Reduce, définies auparavant.

  /**
   * Programme de soumision d'un traitement MapReduce
   */

  import org.apache.hadoop.conf.*;
  import org.apache.hadoop.util.*;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  public class AuthorsJob {

  public static void main(String[] args) throws Exception {

   /* Il nous faut le chemin d'acces au fichier a traiter
           et le chemin d'acces au resultat du reduce */

    if (args.length != 2) {
      System.err.println("Usage: AuthorsJob <in> <out>");
      System.exit(2);
    }

   /* Definition du job */
   Job job = Job.getInstance(new Configuration());

   /* Definition du Mapper et du Reducer */
   job.setMapperClass(AuthorsMapper.class);
   job.setReducerClass(AuthorsReducer.class);

   /* Definition du type du resultat  */
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);

   /* On indique l'entree et la sortie */
   FileInputFormat.addInputPath(job, new Path(args[0]));
   FileOutputFormat.setOutputPath(job, new Path(args[1]));

   /* Soumission */
   job.setJarByClass(AuthorsJob.class);
   job.submit();
  }
}

Un des rôles importants du Job est de définir la source en entrée pour les données (ici un fichier HDFS) et le répertoire HDFS en sortie, dans lequel les reducers vont écrire le résultat.

Compilation, exécution

Il reste à compiler et à exécuter ce traitement. La commande de compilation est la suivante.

hadoop com.sun.tools.javac.Main AuthorsMapper.java AuthorsReducer.java AuthorsJob.java

Les fichiers compilés doivent ensuite être placés dans une archive java (jar) qui sera transmise à tous les serveurs avant l’exécution distribuée. Ici, on crée une archive authors.jar.

jar cf authors.jar AuthorsMapper.class AuthorsReducer.class AuthorsJob.class

Et maintenant, on soumet le traitement au cluster Yarn avec la commande suivante:

hadoop jar authors.jar AuthorsJob /dblp/author-medium.txt /output

On indique donc sur la ligne de commande le Job à exécuter, le fichier en entrée et le répertoire des fichiers de résultat. Dans notre cas, il y aura un seul reducer, et donc un seul fichier nommé part-r-00000 qui sera donc placé dans /output dans HDFS.

Important

Le répertoire de sortie ne doit pas exister avant l’exécution. Pensez à le supprimer si vous exécutez le même job plusieurs fois de suite.

hadoop fs -rm -R /output

Une fois le job exécuté, on peut copier ce fichier de HDFS vers la machine locale avec la commande:

hadoop fs -copyToLocal /output/part-r-00000 resultat

Et voilà! Vous avez une idée complète de l’exécution d’un traitement MapReduce. Le résultat devrait ressembler à:

(...)
Dominique Decouchant    1
E. C. Chow      1
E. Harold Williams      1
Edward Omiecinski       1
Eric N. Hanson  1
Eugene J. Shekita       1
Gail E. Kaiser  1
Guido Moerkotte 1
Hanan Samet     2
Hector Garcia-Molina    2
Injun Choi      1
(...)

Notez que les auteurs sont triés par ordre alphanumérique, ce qui est un effet indirect de la phase de shuffle qui réorganise toutes les paires intermédiaires.

En inspectant l’interface http://localhost:8088/cluster vous verrez les statistiques sur les jobs exécutés.

Quiz

Quelle affirmation parmi les suivantes est vraie pour GFS?

  1. L’arborescence des répertoires et des fichiers correspond à la répartition dans le système distribué

  2. Chaque fichier est découpé en n fragments, où n est le nombre de serveurs

  3. Chaque fichier est découpé en fragments de taille fixe, et chacun est répliqué sur 3 serveurs

  4. Un fichier est stocké sur le serveur qui correspond à son répertoire dans l’arborescence

Commment est constituée l’arborescence des fichiers?

  1. La racine est au niveau du maitre, et chaque fils de la racine est la racine du serveur de fichier local d’un des serveurs participants

  2. L’arboresence est totalement virtuelle et gérée par le maître

  3. L’arborescence des répertoires est gérée par le maître, les fichiers sont ceux des serveurs et sont liés à un répertoire.

Pour quelle utilisation le système GFS est-il le plus adapté

  1. Pour une application client qui peut lire tous les fragments en parallèle, comme expliqué précédemment

  2. Pour un traitement distribué de type MapReduce

  3. Pour une application temps-réel qui va pouvoir chercher un document dans un seul fragment.

S3: langages de traitement: Pig

MapReduce est un système orienté vers les développeurs qui doivent concevoir et implanter la composition de plusieurs jobs pour des algorithmes complexes qui ne peuvent s’exécuter en une seule phase. Cette caractéristique rend également les systèmes MapReduce difficilement accessibles à des non-programmeurs.

La définition de langages de plus haut niveau permettant de spécifier des opérations complexes sur les données est donc apparue comme une nécessité dès les premières versions de systèmes comme Hadoop. L’initiative est souvent venue de communautés familières des bases de données et désirant retrouver la simplicité et la « déclarativité » du langage SQL, transposées dans le domaine des chaînes de traitements pour données massives.

Cette section présente le langage Pig latin, une des premières tentatives du genre, une des plus simples, et surtout l’une des plus représentatives des opérateurs de manipulation de données qu’il est possible d’exécuter sous forme de jobs MapReduce en conservant la scalabilité et la gestion des pannes.

Pig latin (initialement développé par un laboratoire Yahoo!) est un projet Apache disponible à http://pig.apache.org. Vous avez (au moins) deux possibilités pour l’installation.

  • Utilisez la machine Docker https://hub.docker.com/r/hakanserce/apache-pig/

  • Ou récupérez la dernière version sous la forme d’une archive compressée et décompressez-la quelque part, dans un répertoire que nous appellerons pigdir.

Nous utiliserons directement l’interpréteur de scripts (nommé grunt) qui se lance avec:

<pigdir>/bin/pig -x local

L’option local indique que l’on teste les scripts en local, ce qui permet de les mettre au point sur de petits jeux de données avant de passer à une exécution distribuée à grande échelle dans un framework MapReduce.

Cet interpréteur affiche beaucoup de messages, ce qui devient rapidement désagréable. Pour s’en débarasser, créer un fichier nolog.conf avec la ligne:

log4j.rootLogger=fatal

Et lancez Pig en indiquant que la configuration des log est dans ce fichier:

<pigdir>/bin/pig -x local -4 nolog.conf

Une session illustrative

Pig applique des opérateurs à des flots de données semi-structurées. Le flot initial (en entrée) est constituée par lecture d’une source de données quelconque contenant des documents qu’il faut structurer selon le modèle de Pig, à peu de choses près comparable à ce que proposent XML ou JSON.

Dans un contexte réel, il faut implanter un chargeur de données depuis la source. Nous allons nous contenter de prendre un des formats par défaut, soit un fichier texte dont chaque ligne représente un document, et dont les champs sont séparés par des tabulations. Nos documents sont des entrées bibliographiques d’articles scientifiques que vous pouvez récupérer à http://b3d.bdpedia.fr/files/journal-small.txt. En voici un échantillon.

2005    VLDB J. Model-based approximate querying in sensor networks.
1997    VLDB J. Dictionary-Based Order-Preserving String Compression.
2003        SIGMOD Record   Time management for new faculty.
2001    VLDB J. E-Services - Guest editorial.
2003        SIGMOD Record   Exposing undergraduate students to system internals.
1998    VLDB J. Integrating Reliable Memory in Databases.
1996    VLDB J. Query Processing and Optimization in Oracle Rdb
1996    VLDB J. A Complete Temporal Relational Algebra.
1994        SIGMOD Record   Data Modelling in the Large.
2002        SIGMOD Record   Data Mining: Concepts and Techniques - Book Review.
...

Voici à titre d’exemple introductif un programme Pig complet qui calcule le nombre moyen de publications par an dans la revue SIGMOD Record.

-- Chargement des documents de journal-small.txt
articles = load 'journal-small.txt'
    as (year: chararray, journal:chararray, title: chararray) ;
sr_articles = filter articles BY journal=='SIGMOD Record';
year_groups = group sr_articles by year;
count_by_year = foreach year_groups generate group, COUNT(sr_articles.title);
dump count_by_year;

Quand on l’exécute sur notre fichier-exemple, on obtient le résultat suivant:

(1977,1)
(1981,7)
(1982,3)
(1983,1)
(1986,1)
...

Un programme Pig est essentiellement une séquence d’opérations, chacune prenant en entrée une collection de documents (les collections sont nommées bag dans Pig latin, et les documents sont nommés tuple) et produisant en sortie une autre collection. La séquence définit une chaîne de traitements transformant progressivement les documents.

_images/pig-workflow.png

Fig. 100 Un exemple de workflow (chaîne de traitements) avec Pig

Il est intéressant de décomposer, étape par étape, cette chaîne de traitement pour inspecter les collections intermédiaires produites par chaque opérateur.

Chargement. L’opérateur load crée une collection initiale articles par chargement du fichier. On indique le schéma de cette collection pour interpréter le contenu de chaque ligne. Les deux commandes suivantes permettent d’inspecter respectivement le schéma d’une collection et un échantillon de son contenu.

grunt> describe articles;
articles: {year: chararray,journal: chararray,title: chararray}

grunt> illustrate articles;
---------------------------------------------------------------------------
| articles | year: chararray | journal: chararray | title: chararray      |
---------------------------------------------------------------------------
|          | 2003            | SIGMOD Record      | Call for Book Reviews.|
---------------------------------------------------------------------------

Pour l’instant, nous sommes dans un contexte simple où une collection peut être vue comme une table relationnelle. Chaque ligne/document ne contient que des données élémentaires.

Filtrage. L’opération de filtrage avec filter opère comme une clause where en SQL. On peut exprimer avec Pig des combinaisons Booléennes de critères sur les attributs des documents. Dans notre exemple le critère porte sur le titre du journal.

Regroupement. On regroupe maintenant les tuples/documents par année avec la commande group by. À chaque année on associe donc l’ensemble des articles parus cette année-là, sous la forme d’un ensemble imbriqué. Examinons la représentation de Pig:

grunt> year_groups = GROUP sr_articles BY year;

grunt> describe year_groups;
  year_groups: {group: chararray,
    sr_articles: {year: chararray,journal: chararray,title:chararray}}

grunt> illustrate year_groups;
 group: 1990
 sr_articles:
  {
   (1990, SIGMOD Record, An SQL-Based Query Language For Networks of Relations.),
   (1990, SIGMOD Record, New Hope on Data Models and Types.)
  }

Le schéma de la collection year_group, obtenu avec describe, comprend donc un attribut nommé group correspondant à la valeur de la clé de regroupement (ici, l’année) et une collection imbriquée nommée d’après la collection-source du regroupement (ici, sr_articles) et contenant tous les documents partageant la même valeur pour la clé de regroupement.

L’extrait de la collection obtenu avec illustrate montre le cas de l’année 1990.

À la syntaxe près, nous sommes dans le domaine familier des documents semi-structurés. Si on compare avec JSON par exemple, les objets sont notés par des parenthèses et pas par des accolades, et les ensembles par des accolades et pas par des crochets. Une différence plus essentielle avec une approche semi-structurée de type JSON ou XML est que le schéma est distinct de la représentation des documents: à partir d’une collection dont le schéma est connu, l’interpréteur de Pig infère le schéma des collections calculées par les opérateurs. Il n’est donc pas nécessaire d’inclure le schéma avec le contenu de chaque document.

Le modèle de données de Pig comprend trois types de valeurs:

  • Les valeurs atomiques (chaînes de caractères, entiers, etc.).

  • Les collections (bags pour Pig) dont les valeurs peuvent être hétérogènes.

  • Les documents (tuples pour Pig), équivalent des objets en JSON: des ensembles de paires (clé, valeur).

On peut construire des structures arbitrairement complexes par imbrication de ces différents types. Comme dans tout modèle semi-structuré, il existe très peu de contraintes sur le contenu et la structure. Dans une même collection peuvent ainsi cohabiter des documents de structure très différente.

Application de fonctions. Un des besoins récurrents dans les chaînes de traitement est d’appliquer des fonctions pour annoter, restructurer ou enrichir le contenu des documents passant dans le flux. Ici, la collection finale avg_nb est obtenue en appliquant une fonction standard count(). Dans le cas général, on applique des fonctions applicatives intégrées au contexte d’exécution Pig: ces fonctions utilisateurs (User Defined Functions ou UDF) sont le moyen privilégié de combiner les opérateurs d’un langage comme Pig avec une application effectuant des traitements sur les documents. L’opérateur foreach/generate permet cette combinaison.

Les opérateurs

La table ci-dessous donne la liste des principaux opérateurs du langage Pig. Tous s’appliquent à une ou deux collections en entrée et produisent une collection en sortie.

Opérateur

Description

foreach

Applique une expression à chaque document de la collection

filter

Filtre les documents de la collection

order

Ordonne la collection

distinct

Elimine lse doublons

cogroup

Associe deux groupes partageant une clé

cross

Produit cartésien de deux collections

join

Jointure de deux collections

union

Union de deux collections

Voici quelques exemples pour illustrer les aspects essentiels du langage, basés sur le fichier http://b3d.bdpedia.fr/files/webdam-books.txt. Chaque ligne contient l’année de parution d’un livre, le titre et un auteur.

1995        Foundations of Databases Abiteboul
1995        Foundations of Databases Hull
1995        Foundations of Databases Vianu
2012        Web Data Management Abiteboul
2012    Web Data Management Manolescu
2012        Web Data Management Rigaux
2012        Web Data Management Rousset
2012        Web Data Management Senellart

Le premier exemple ci-dessous montre une combinaison de group et de foreach permettant d’obtenir une collection avec un document par livre et un ensemble imbriqué contenant la liste des auteurs.

-- Chargement de la collection
books = load 'webdam-books.txt'
    as (year: int, title: chararray, author: chararray) ;
group_auth = group books by title;
authors = foreach group_auth generate group, books.author;
dump authors;

L’opérateur foreach applique une expression aux attributs de chaque document. Encore une fois, Pig est conçu pour que ces expressions puissent contenir des fonctions externes, ou UDF (User Defined Functions), ce qui permet d’appliquer n’importe quel type d’extraction ou d’annotation.

L’ensemble résultat est le suivant:

(Foundations of Databases,
   {(Abiteboul),(Hull),(Vianu)})
(Web Data Management,
   {(Abiteboul),(Manolescu),(Rigaux),(Rousset),(Senellart)})

L’opérateur flatten sert à « aplatir » un ensemble imbriqué.

-- On prend la collection group_auth et on l'aplatit
flattened = foreach group_auth generate group ,flatten(books.author);

On obtient:

(Foundations of Databases,Abiteboul)
(Foundations of Databases,Hull)
(Foundations of Databases,Vianu)
(Web Data Management,Abiteboul)
(Web Data Management,Manolescu)
(Web Data Management,Rigaux)
(Web Data Management,Rousset)
(Web Data Management,Senellart)

L’opérateur cogroup prend deux collections en entrée, crée pour chacune des groupes partageant une même valeur de clé, et associe les groupes des deux collections qui partagent la même clé. C’est un peu compliqué en apparence; regardons la Fig. 101. Nous avons une collection A avec des documents d dont la clé de regroupement vaut a ou b, et une collection B avec des documents d”. Le cogroup commence par rassembler, séparément dans A et B, les documents partageant la même valeur de clé. Puis, dans une seconde phase, les groupes de documents provenant des deux collections sont assemblés, toujours sur la valeur partagée de la clé.

_images/pig-cogroup.png

Fig. 101 L’opérateur cogroup de Pig.

Prenons une seconde collection, contenant des éditeurs (fichier http://b3d.bdpedia.fr/files/webdam-publishers.txt):

Fundations of Databases     Addison-Wesley  USA
Fundations of Databases     Vuibert France
Web Data Management         Cambridge University Press      USA

On peut associer les auteurs et les éditeurs de chaque livre de la manière suivante.

--- Chargement de la collection
publishers = load 'webdam-publishers.txt'
  as (title: chararray, publisher: chararray) ;
cogrouped = cogroup flattened by group, publishers by title;

Le résultat (restreint au premier livre) est le suivant.

(Foundations of Databases,
  { (Foundations of Databases,Abiteboul),
    (Foundations of Databases,Hull),
    (Foundations of Databases,Vianu)
  },
  {(Foundations of Databases,Addison-Wesley),
   (Foundations of Databases,Vuibert)
  }
)

Je vous laisse exécuter la commande par vous-même pour prendre connaissance du document complet. Il contient un document pour chaque livre avec trois attributs. Le premier est la valeur de la clé de regroupement (le titre du livre). Le second est l’ensemble des documents de la première collection correspondant à la clé, le troisième l’ensemble des documents de la seconde collection correspondant à la clé.

Il s’agit d’une forme de jointure qui regroupe, en un seul document, tous les documents des deux collections en entrée qui peuvent être appariés. On peut aussi exprimer la jointure ainsi:

-- Jointure entre la collection 'flattened' et  'publishers'
joined = join flattened by group, publishers by title;

On obtient cependant une structure différente de celle du cogroup, tout à fait semblable à celle d’une jointure avec SQL, dans laquelle les informations ont été « aplaties ».

(Foundations of Databases,Abiteboul,Fundations of Databases,Addison-Wesley)
(Foundations of Databases,Abiteboul,Fundations of Databases,Vuibert)
(Foundations of Databases,Hull,Fundations of Databases,Addison-Wesley)
(Foundations of Databases,Hull,Fundations of Databases,Vuibert)
(Foundations of Databases,Vianu,Fundations of Databases,Addison-Wesley)
(Foundations of Databases,Vianu,Fundations of Databases,Vuibert)

La comparaison entre cogroup et join montre la flexibilité apportée par un modèle semi-structuré et sa capacité à représenter des ensembles imbriqués. Une jointure relationnelle doit produire des tuples « plats », sans imbrication, alors que le cogroup autorise la production d’un état intermédiaire où toutes les données liées sont associées dans un même document, ce qui peut être très utile dans un contexte analytique.

Voici un dernier exemple montrant comment associer à chaque livre le nombre de ses auteurs.

books = load 'webdam-books.txt'
    as (year: int, title: chararray, author: chararray) ;
group_auth = group books by title;
authors = foreach group_auth generate group, COUNT(books.author);
dump authors;

Exercices

Reportez-vous également au chapitre Pig : Travaux pratiques pour un ensemble d’exercices à faire sur machine.

Exercice Ex-CalcDist-1: MapReduce en distribué avec MongoDB

Vous devez avoir implanté un compteur de mots avec MongoDB dans la chapitre Interrogation de bases NoSQL. Vous devriez également avoir engendré une collection volumineuse et distribuée grâce au générateur de données ipsum (cf. chapitre Systèmes NoSQL: le partitionnement). Il ne reste plus qu’à faire l’essai: lancer, en vous connectant au routeur mongos, le calcul MapReduce dans MongoDB. Ce calculer devrait insérer le résultat dans une collection partitionnée présente sur les différents serveurs. À vous de jouer.

Exercice Ex-CalcDist-2: un grep, en MapReduce

On veut scanner des millards de fichiers et afficher tous ceux qui contiennent une chaîne de caractères c. Donnez la solution en MapReduce, en utilisant le formalisme de votre choix (de préférence un pseudo-code un peu structuré quand même).

Correction

Pas bien compliqué. Map: on charge les fichiers un par un (ou ligne par ligne s’ils sont vraiment très gros), on cherche c et si on la trouve on émet le nom du fichier et un indicateur quelconque. Soit, en code:

// Le pattern est supposé connu dans le contexte d'exécution
function MapGrep (fichier) {
  if (contains(fichier, pattern) {
       emit(fichier.nom, 1);
}

Reduce: on émet le nom du fichier, éventellement avec le nombre d’occurrences trouvées de c. Par exemple:

function ReduceGrep (nomFichier, [nb]) {
  return (nomFichier, sum(nb));
}

Exercice Ex-CalcDist-3: un rollup, en MapReduce

Une grande surface enregistre tous ses tickets de caisse, indiquant les produits vendus, le prix et la date, ainsi que le client si ce dernier a une carte de fidélité.

Les produits sont classés selon une taxonomie comme illustré sur la Fig. 102, avec des niveaux de précision. Pour chaque produit on sait à quelle catégorie précise de N1 il appartient (par exemple, chaussure); pour chaque catégorie on connaît son parent.

_images/taxonomie.png

Fig. 102 Les produits et leur classement.

Supposons que la collection Tickets contienne des documents de la forme (idTicket, idClient, idProduit, catégorie, date, prix). Comment obtenir en MapReduce le total des ventes à une date d, pour le niveau N2? On fait donc une agrégation de Tickets au niveau supérieur de la taxonomie.

Correction

Il nous faut donc une fonction parent(n) qui renvoie le parent d’un nœud n dans la taxonomie. On pourrait généraliser avec une fonction ancêtre (n, niveau) qui renvoie l’ancêtre à un niveau donné.

Cela acquis, il suffit d’appliquer la fonction parent(n) à chaque ticket pour pouvoir faire le regroupement voulu. La clé de regroupement est une paire constituée d’une date et du parent. Soit:

function MapRollup (ticket) {
       emit({ticket.date, parent(ticket.categorie)}, ticket.prix);
}

Reduce: direct.

function ReduceRollup ({date, categ}, [prix]) {
  return ({date, categ}, sum(prix));
}

Exercice Ex-CalcDist-4: MapReduce, calcul distribué pour les nuls

MapReduce est souvent une solution brutale et inefficace (mais facile à implanter) pour des problèmes qui ont des solutions bien plus élégantes.

Par exemple: vous disposez d’une collection distribuée de très grande taille, disons des utilisateurs. Voulez calculer la valeur médiane d’une variable, l’âge, ou le solde du compte, ou n’importe quoi.

Exercice Ex-CalcDist-5: algèbre linéaire distribuée

Nous disposons le calcul d’algèbre linéaire du chapitre `chap-mapreduce`_. On a donc une matrice M de dimension \(N \times N\) représentant les liens entres les \(N\) pages du Web, chaque lien étant qualifié par un facteur d’importance (ou « poids »). La matrice est représentée par une collection math:C dans laquelle chaque document est de la forme {« id »: &23, « lig »: i, « col »: j, « poids »: \(m_{ij}\)}, et représente un lien entre la page \(P_i\) et la page \(P_j\) de poids \(m_{ij}\)

Vous avez déjà vu le calcul de la norme des lignes de la matrice, et celui du produit de la matrice par un vecteur \(V\). Prenons en compte maintenant la taille et la distribution.

Questions

  • On estime qu’il y a environ \(N=10^{10}\) pages sur le Web, avec 15 liens par page en moyenne. Quelle est la taille de la collection \(C\), en TO, en supposant que chaque document a une taille de 16 octets

  • Nos serveurs ont 2 disques de 1 TO chacun et chaque document est répliqué 2 fois (donc trois versions en tout). Combien affectez-vous de serveurs au système de stokage?

  • Maintenant, on suppose que \(V\) ne tient plus dans la mémoire RAM d’une seule machine. Proposez une méthode de partitionnement de la collection \(C\) et de \(V\) qui permette d’effectuer le calcul distribué de \(M \times V\) avec MapReduce sans jamais avoir à lire le vecteur sur le disque.

    Donnez le critère de partitionnement et la technique (par intervalle ou par hachage).

  • Supposons qu’on puisse stocker au plus deux (2) coordonnées d’un vecteur dans la mémoire d’un serveur. Inspirez-vous de la Fig. 96 pour montrer le déroulement du traitement distribué précédent en choisissant le nombre minimal de serveurs permettant de conserver le vecteur en mémoire RAM.

    Pour illustrer le calcul, prenez la matrice \(4\times4\) ci-dessous, et le vecteur \(V = [4,3,2,1]\).

    \[\begin{split}M= \left[ {\begin{array}{cccc} 1 & 2 & 3 & 4 \\ 7 & 6 & 5 & 4 \\ 6 & 7 & 8 & 9 \\ 3 & 3 & 3 & 3 \\ \end{array} } \right]\end{split}\]
  • Expliquez pour finir comment calculer la similarité cosinus entre \(V\) et les lignes \(L_i\) de la matrice.

Correction

Il y a donc \(N=150 \times 10^{9}\) liens à placer dans la matrice, chaque lien étant représenté par un document de 16 octets. Soit \(N=2400 \times 10^{9}\) octets, ou 2,4 TO.

Avec trois copies de chaque lien, on arrive à 7,2 TO. Il faut donc au moins 4 serveurs pour pouvoir stocker la matrice (répliquée) sur disque.

Le vecteur \(V\) ne tient plus en mémoire RAM pour une seule machine. Il faut donc le découper en \(f\) fragments de manière à ce que chaque fragment contenant \(\frac{N}{f}\) coordonnées tienne en RAM (on suppose qu’on a assez de machines, ce qui semble raisonnable). On a donc les fragments \(V_1[0, \frac{N}{f}[\), \(V_2[\frac{N}{f}, 2 \times \frac{N}{f}[\), etc.

Chaque fonction de MAP accède à l’un des fragments \(V_i[(i-1) \times \frac{N}{f}, i \times \frac{N}{f}]\) du vecteur. Cette fonction peut donc se contenter d’accéder à la partie de la matrice qui doit être combinée à ce fragment: il s’agit évidemment des colonnes \([(i-1) \times \frac{N}{f}, i \times \frac{N}{f}[\). On va donc partitionner la matrice en \(f\) fragments, verticalement.

_images/partition-matrice.png

Fig. 103 Partition verticale de la matrice

La Fig. 103 montre le partitionnement du vecteur et celui de la matrice. Le calcul MapReduce est alors exactement le même que celui déjà vu pour un calcul dans le cas où le vecteur tient en mémoire. La seule différence est qu’il s’applique aux fragments à apparier de la matrice et du vecteur.

La Fig. 104 illustre le calcul distribué avec un partitionnement minimal en deux fragments. Pour simplifier un peu la figure, on montre le calcul au niveau des lignes et pas des cellules élémentaires. Un premier serveur multiplie les deux premières colonnes de la matrice avec les deux premières coordonnées du vecteur. Le second serveur effectue le calcul complémentaire.

_images/mr-execution-matrice.png

Fig. 104 Illustration du calcul distribué

On a utilisé deux reducers, avec une distribution (shuffle) qui envoie tous les résultats intermédiaires des lignes 1 et 3 vers le premier reducer, et ceux des lignes 2 et 4 vers le second.

Pour finir, le calcul de la similarité cosinus est obtenu en combinant un premier calcul des normes des vecteurs, suivi du produit avec le vecteur du document-cible.