17. Traitement de données massives avec Apache Spark

S2: Introduction à Spark

Les opérateurs d’un langage comme Pig restent liés à un modèle d’exécution MapReduce. Un programme Pig est compilé et exécuté comme une séquence de jobs MapReduce. Cela ne couvre pas une classe importante d’algorithmes: ceux qui procèdent par itérations sur un résultat progressivement affiné à chaque exécution. Ce type d’algorithme est très fréquent dans le domaine général de la fouille de données: PageRank, kMeans, calculs de parties connexes dans les graphes, etc.

Avec MapReduce, la spécification de l’itération reste à la charge du programmeur; il faut stocker le résultat d’un premier job dans une collection intermédiaire et réiterer le job en prenant la collection intermédiaire comme source. C’est laborieux pour l’implantation, et surtout très peu efficace quand la collection intermédiaire est grande. Le processus de sérialisation/désérialisation sur disque propre à la gestion de la reprise sur panne en MapReduce entraîne des coûts très élevés et des performances médiocres.

Des systèmes récents comme Spark proposent des méthodes spécifiquement dédiées aux algorithmes accédant de manière réitérée à même jeu de données. Dans Spark, la méthode consiste essentiellement à placer ces jeux de données en mémoire RAM et à éviter la pénalité des écritures sur le disque. Le défi est alors bien sûr de proposer une reprise sur panne automatique efficace.

Les RDDs dans Spark

La principale innovation de Spark est le concept de Resilient Distributed Dataset (RDD). Un RDD est une collection (pour en rester à notre vocabulaire) calculée à partir d’une source de données (par exemple une base de données MongoDB, un flux de données, un autre RDD) et placée en mémoire RAM. Spark conserve l’historique des opérations qui a permis de constituer un RDD, et la reprise sur panne s’appuie essentiellement sur la préservation de cet historique afin de reconstituer le RDD en cas de panne. Pour le dire brièvement: Spark n’assure pas la préservation des données en extension mais en intention. La préservation d’un programme qui tient en quelques lignes de spécification (cf. les programmes Pig) est beaucoup plus facile et efficace que la préservation du jeu de données issu de cette chaîne. C’est l’idée principale pour la résilience des RDDs.

Par ailleurs, les RDDs représentent des collections partitionnées et distribuées. Chaque RDD est donc constitué de ce que nous avons appelé fragments. Une panne affectant un fragment individuel peut donc être réparée (par reconstitution de l’historique) indépendamment des autres fragments, évitant d’avoir à tout recalculer.

Un RDD est calculé par une transformation, l’équivalent de ce nous avons appelé opérateur dans Pig. Comme dans Pig, une transformation sélectionne, enrichit, restructure une collection, ou combine deux collections. On retrouve dans Spark, à peu de choses près, les mêmes opérateurs/transformations que dans Pig, comme le montre la table ci-dessous (qui n’est pas exhaustive: reportez-vous à la documentation pour des compléments).

Opérateur Description
map Prend un document en entrée et produit un document en sortie
filter Filtre les documents de la collection
flatMap Prend un document en entrée, produit un ou plusieurs document(s) en sortie
groupByKey Regroupement de documents par une valeur de clé commune
reduceByKey Réduction d’une paire (k, [v]) par une agrégation du tableau [v]
crossProduct Produit cartésien de deux collections
join Jointure de deux collections
union Union de deux collections
cogroup Cf. la description de l’opérateur dans la section sur Pig
sort Tri d’une collection

Les RDD représentent les collections obtenues au cours des différentes étapes d’une chaîne de traitement. C’est exactement la notion que nous avons déjà étudiée avec Pig. La différence essentielle est que dans Spark, les RDD peuvent être marquées comme étant persistants car ils peuvent être réutilisés dans d’autres chaînes. Spark fait son possible pour stocker un RDD persistant en mémoire RAM, pour un maximum d’efficacité.

_images/spark-rdd.png

Fig. 17.1 RDD persistants et transitoires dans Spark.

Les RDD forment un graphe construit par application de transformations à partir de collections stockées (figure RDD persistants et transitoires dans Spark.). Un RDD peut être construit à partir d’un ou plusieurs autres RDD (2 au maximum). S’il n’est pas marqué comme persistant, le RDD sera transitoire et ne sera pas conservé en mémoire après calcul (c’est le cas des RDD 1 et 3 sur la figure). Sinon, il est conservé en mémoire.

Exemple: analyse de fichiers log

Prenons un exemple concret: dans un serveur d’application, on constate d’un module M produit des résultats incorrects de temps en temps. On veut analyser le fichier journal (log) de l’application qui contient les messages produits par le module suspect, et par beaucoup d’autres.

On construit donc un programme qui charge le log sous forme de collection, ne conserve que les messsages produits par le module M et analyse ensuite ces messages. Plusieurs analyses sont possibles en fonction des causes suspectées: la première par exemple regarde le log de M pour un produit particulier, la seconde pour un utilisateur particulier, la troisième pour une tranche horaire particulière, etc.

Avec Spark, on va créer un RDD logM persistant, contenant les messages produits par M. On construira ensuite, à partir de logM de nouveaux RDD dérivés pour les analyses spécifiques (figure Scénario d’une analyse de log avec Spark).

_images/spark-log.png

Fig. 17.2 Scénario d’une analyse de log avec Spark

On combine deux transformations pour construire logM, comme le montre le programme suivant (qui n’est pas la syntaxe exacte de Spark, que nous présenterons plus loin).

// Chargement de la collection
log = load ("app.log") as (...)
// Filtrage des messages du module M
logM = filter log with log.message.contains ("M")
// On rend logM persistant !
logM.persist();

On peut alors construire une analyse basée sur le code produit directement à partir de logM.

// Filtrage par produit
logProduit = filter logM with log.message.contains ("product P")
// .. analyse du contenu de logProduit

Et utiliser également logM pour une autre analyse, basée sur l’utilisateur.

// Filtrage par utilisateur
logUtilisateur = filter logM with log.message.contains ("utilisateur U")
// .. analyse du contenu de logProduit

Ou encore par tranche horaire.

// Filtrage par utilisateur
logPeriode = filter logM with log.date.between d1 and d2
// .. analyse du contenu de logPeriode

logM est une sorte de “vue” sur la collection initiale, dont la persistance évite de refaire le calcul complet à chaque analyse.

Reprise sur panne

Pour comprendre la reprise sur panne, il faut se pencher sur le second aspect des RDD: la distribution. Un RDD est une collection partitionnée (cf. chapitre Systèmes NoSQL: le partitionnement). La figure Partitionnement et reprise sur panne dans Spark. montre le traitement précédent dans une perspective de distribution. Chaque RDD, persistant ou non, est composé de fragments répartis dans la grappe de serveurs.

_images/spark-failover.png

Fig. 17.3 Partitionnement et reprise sur panne dans Spark.

Si une panne affecte un calcul s’appuyant sur un fragment F de RDD persistant (par exemple la transformation notée T et marquée par une croix rouge sur la figure), il suffit de le relancer à partir de F. Le gain en temps est considérable!

La panne la plus sévère affecte un fragment de RDD persistant (par exemple celui marqué par une croix violette). Dans ce cas, Spark a mémorisé la chaîne de traitement ayant constitué le RDD, et il suffit de ré-appliquer cette chaîne en remontant jusqu’aux fragments qui précèdent dans le graphe des calculs.

Dans notre cas, il faut parcourir à nouveau le fichier log pour créer le fragment logn. Si les collections stockées à l’origine du calcul sont elles-mêmes partitionnées (ce qui n’est sans doute pas le cas pour un fichier log), il suffira d’accéder à la partie de la collection à l’origine des calculs menant au RDD défaillant.

En résumé, Spark exploite la capacité à reconstruire des fragments de RDD par application de la chaîne de traitement, et ce en se limitant si possible à une partie seulement des données d’origine. La reprise peut prendre du temps, mais elle évite un recalcul complet. Si tout se passe bien (pas de panne) la présence des résultats intermédiaires en mémoire RAM assure de très bonnes performances.

Une session Spark: comptons les mots!

Voici une brève session Spark, directement inspirée de la documentation en ligne. Vous devez d’abord récupérer la dernière version stable de Spark depuis http://spark.apache.org. Installez-le dans un répertoire que nous appellerons sparkdir.

Lancez l’interpréteur de commandes.

./bin/spark-shell

Comme pour Pig, nous allons exécuter quelques commandes en local, sachant qu’il suffit d’un peu de configuration pour passer à un mode pleinement distribué. Vous pouvez récupérer le fichier http://b3d.bdpedia.fr/files/loups.txt pour faire un essai (il est temps de savoir à quoi s’en tenir à propos de ces loups et de ces moutons!), sinon n’importe quel fichier texte fait l’affaire.

Note

Comme Pig, l’interpréteur de Spark affiche de nombreux messages à la console ce qui est perturbant. Pour s’en débarasser:

  • copiez le fichier sparkdir/conf/log4j.properties.template en sparkdir/conf/log4j.properties;
  • éditez log4j.properties et remplacez dans la première ligne le niveau INFO par ERROR.

Ceci en supposant que les choses n’ont pas changé entre votre version et la mienne. Sinon, cherchez sur le web.

L’interpéteur de Spark accepte des commandes en langage Scala, dont vous êtes probablement déjà spécialiste. Si ce n’est pas le cas, copiez-collez les commandes ci-dessous.

scala> val loupsEtMoutons = sc.textFile("loups.txt")
   loupsEtMoutons: org.apache.spark.rdd.RDD[String] = loups.txt

Nous avons créé un premier RDD! Spark propose des actions directement applicable à un RDD et produisant des résultats scalaires. (Un RDD est interfacé comme un objet auquel nous pouvons appliquer des méthodes.)

scala> loupsEtMoutons.count() // Nombre de documents dans ce RDD
  res0: Long = 126

scala> loupsEtMoutons.first() // Premier document du RDD
  res1: String = Le loup est dans la bergerie.

scala> loupsEtMoutons.collect() // Récupération du RDD complet

Note

Petite astuce: en entrant le nom de l’objet (loupsEtMoutons.) suivi de la touche TAB, l’interpréteur Scala vous affiche la liste des méthodes disponibles.

Passons aux transformations. Elles prennent un (ou deux) RDD en entrée, produisent un RDD en sortie. On peut sélectionner les documents qui contiennent “bergerie”.

scala> val bergerie = loupsEtMoutons.filter(line => line.contains("bergerie"))

On peut afficher le contenu d’un RDD.

scala> bergerie.collect()
 res9: Array[String] = Array(Le loup est dans la bergerie.,
                 Les moutons sont dans la bergerie.,
                 Un loup a mangé un mouton, les autres loups sont restés dans la bergerie.)

On peut combiner une transformation et une action.

scala> loupsEtMoutons.filter(line => line.contains("loup")).count()

Et pour conclure cette petite session introductive, voici comment on implante en Spark le compteur de termes dans une collection. On crée un premier RDD constitué de tous les termes

val termes = loupsEtMoutons.flatMap(line => line.split(" "))

La méthode split décompose une chaîne de caractère (ici, en prenant comme séparateur un espace). Notez l’opérateur flatMap qui produit plusieurs documents (ici un terme) pour un document en entrée (ici une ligne).

On introduit la notion de comptage: chaque terme vaut 1. L’opérateur map produit un document en sortie pour chaque document en entrée. On peut s’en servir ici pour enrichir chaque terme avec son compteur initial.

val termeUnit = termes.map(word => (word, 1))

Rappelons qu’à chaque étape, vous pouvez afficher le contenu du RDD avec collect(). L’étape suivante regroupe les termes et effectue la somme de leurs compteurs: c’est un opérateur reduceByKey.

scala> val compteurTermes = termeUnit.reduceByKey((a, b) => a + b)

On passe à l’opérateur une fonction de réduction, ici notée littéralement dans la syntaxe Scala (si j’ai deux valeurs, j’en fait la somme). Et voilà! On aurait pu tout exprimer en une seule fois.

scala> loupsEtMoutons.flatMap(line => line.split(" "))
                       .map(word => (word, 1))
                       .reduceByKey((a, b) => a + b)

Si vous regardez le résultat, il pourra vous sembler étrange: il manque les diverses étapes de simplification du texte qui sont de mise pour un moteur de recherche. Mais l’essentiel est de comprendre l’enchaînement des opérateurs.

Finalement, si on souhaite conserver en mémoire le RDD final pour le soumettre à divers traitements, il suffit d’appeler:

scala> compteurTermes.persist()

CQFD!

Quiz

  • Quelles sont les deux fonctions principales d’un framework MapReduce ?
  • Qu’est-ce qu’un JobTracker dans Hadoop ? Combien y en a-t-il ?
  • Est-ce que les Mappers ou les Reducers peuvent communiquer entre eux ? Pourquoi ?
  • Où sont stockés les résultats des Mappers : collection distribuée ou disque local ? Pourquoi?
  • Résumer le principe de data locality.
  • Quel est le rôle de l’opération de shuffle ?
  • Comment expliqueriez-vous l’importance de traiter les paires (clé, valeur) indépendamment les unes des autres ?
  • J’implante un système MapReduce en transmettant directement les paires intermédiaires des Mappers vers les Reducers, sans écriture locale. En cas de plantage d’un des Reducers, que faut-il recalculer? Donner un exemple le plus simple possible.
  • Expliquez brièvement ce que doit faire le JobMaster quand un Reducer se plante.
  • Reprenez l’exemple de notre session illustrative de Pig. Comme exécuter cette chaîne de traitement avec MapReduce? Donnez le détail de ce qui se passe dans la phase de Map et la phase de Reduce.
  • Quand commence l’exécution du Reduce ? et pourquoi ?