18. Traitement de données massives avec Apache Spark

Avec le système Spark, nous abordons un premier exemple (sans doute le plus en vogue au moment où ces lignes sont écrites) d’environnements dédiés au calcul distribué à grande échelle qui proposent des fonctionnalités bien plus puissantes que le simple MapReduce des origines, encore disponible dans l’écosystème Hadoop.

Ces fonctionnalités consistent notamment en un ensemble d’opérateurs de second ordre (voir cette notion dans le chapitre précédent) qui étendent considérablement la simple paire constituée du Map et du Reduce. Nous avons eu un aperçu de ces opérateurs avec Pig, qui reste cependant lié à un contexte d’exécution MapReduce (un programme Pig est compilé et exécuté comme une séquence de jobs MapReduce).

Entre autres limitations, cela ne couvre pas une classe importante d’algorithmes: ceux qui procèdent par itérations sur un résultat progressivement affiné à chaque exécution. Ce type d’algorithme est très fréquent dans le domaine général de la fouille de données: PageRank, kMeans, calculs de composantes connexes dans les graphes, etc.

Ce chapitre propose une introduction au système Spark. Nous nous contenterons d’entrer des commandes grâce à l’interpréteur de commandes spark-shell, et le plus simple pour reproduire ces commandes est donc de télécharger la dernière version de Spark depuis le site http://spark.apache.org. L’installation comprend un sous-répertoire bin dans lequel se trouvent les commandes qui nous intéressent. Vous pouvez donc placer le chemin vers spark/bin dans votre variable PATH, selon des spécificités qui dépendent de votre environnement: à ce stade du cours vous devriez être rôdés à ce type de manœuvre.

S1: Introduction à Spark

Avec MapReduce, la spécification de l’itération reste à la charge du programmeur; il faut stocker le résultat d’un premier job dans une collection intermédiaire et réiterer le job en prenant la collection intermédiaire comme source. C’est laborieux pour l’implantation, et surtout très peu efficace quand la collection intermédiaire est grande. Le processus de sérialisation/désérialisation sur disque propre à la gestion de la reprise sur panne en MapReduce entraîne des performances médiocres.

Dans Spark, la méthode est très différente. Elle consiste à placer ces jeux de données en mémoire RAM et à éviter la pénalité des écritures sur le disque. Le défi est alors bien sûr de proposer une reprise sur panne automatique efficace.

Les Resilient Data Sets

La principale innovation apportée par Spark est le concept de Resilient Distributed Dataset (RDD). Un RDD est une collection (pour en rester à notre vocabulaire) calculée à partir d’une source de données (par exemple une base de données Cassandra, un flux de données, un autre RDD) et placée en mémoire RAM. Spark conserve l’historique des opérations qui a permis de constituer un RDD, et la reprise sur panne s’appuie essentiellement sur la préservation de cet historique afin de reconstituer le RDD en cas de panne. Pour le dire brièvement: Spark n’assure pas la préservation des données en extension mais en intention. La préservation d’un programme qui tient en quelques lignes de spécification (cf. les programmes Pig) est beaucoup plus facile et efficace que la préservation du jeu de données issu de cette chaîne. C’est l’idée principale pour la résilience des RDDs.

Par ailleurs, les RDDs représentent des collections partitionnées et distribuées. Chaque RDD est donc constitué de ce que nous avons appelé fragments. Une panne affectant un fragment individuel peut donc être réparée (par reconstitution de l’historique) indépendamment des autres fragments, évitant d’avoir à tout recalculer.

Un RDD est calculé par une transformation, l’équivalent de ce nous avons appelé opérateur dans Pig. Comme dans Pig, une transformation sélectionne, enrichit, restructure une collection, ou combine deux collections. On retrouve dans Spark, à peu de choses près, les mêmes opérateurs/transformations que dans Pig, comme le montre la table ci-dessous (qui n’est bien sûr pas exhaustive: reportez-vous à la documentation pour des compléments).

Opérateur Description
map Prend un document en entrée et produit un document en sortie
filter Filtre les documents de la collection
flatMap Prend un document en entrée, produit un ou plusieurs document(s) en sortie
groupByKey Regroupement de documents par une valeur de clé commune
reduceByKey Réduction d’une paire (k, [v]) par une agrégation du tableau [v]
crossProduct Produit cartésien de deux collections
join Jointure de deux collections
union Union de deux collections
cogroup Cf. la description de l’opérateur dans la section sur Pig
sort Tri d’une collection

Les RDD représentent les collections obtenues au cours des différentes étapes d’une chaîne de traitement. C’est exactement la notion que nous avons déjà étudiée avec Pig. La différence essentielle est que dans Spark, les RDD peuvent être marquées comme étant persistants car ils peuvent être réutilisés dans d’autres chaînes. Spark fait son possible pour stocker un RDD persistant en mémoire RAM, pour un maximum d’efficacité.

_images/spark-rdd.png

Fig. 18.1 RDD persistants et transitoires dans Spark.

Les RDD forment un graphe construit par application de transformations à partir de collections stockées (Fig. 18.1). Un RDD peut être construit à partir d’un ou plusieurs autres RDD (2 au maximum). S’il n’est pas marqué comme persistant, le RDD sera transitoire et ne sera pas conservé en mémoire après calcul (c’est le cas des RDD 1 et 3 sur la figure). Sinon, il est stocké en RAM, et disponible comme source de données pour d’autres transformations.

Exemple: analyse de fichiers log

Prenons un exemple concret: dans un serveur d’application, on constate qu’un module M produit des résultats incorrects de temps en temps. On veut analyser le fichier journal (log) de l’application qui contient les messages produits par le module suspect, et par beaucoup d’autres modules.

On construit donc un programme qui charge le log sous forme de collection, ne conserve que les messsages produits par le module M et analyse ensuite ces messages. Plusieurs analyses sont possibles en fonction des causes suspectées: la première par exemple regarde le log de M pour un produit particulier, la seconde pour un utilisateur particulier, la troisième pour une tranche horaire particulière, etc.

Avec Spark, on va créer un RDD logM persistant, contenant les messages produits par M. On construira ensuite, à partir de logM de nouveaux RDD dérivés pour les analyses spécifiques (Fig. 18.2).

_images/spark-log.png

Fig. 18.2 Scénario d’une analyse de log avec Spark

On combine deux transformations pour construire logM, comme le montre le programme suivant (qui n’est pas la syntaxe exacte de Spark, que nous présenterons plus loin).

// Chargement de la collection
log = load ("app.log") as (...)
// Filtrage des messages du module M
logM = filter log with log.message.contains ("M")
// On rend logM persistant !
logM.persist();

On peut alors construire une analyse basée sur le code produit directement à partir de logM.

// Filtrage par produit
logProduit = filter logM with log.message.contains ("product P")
// .. analyse du contenu de logProduit

Et utiliser également logM pour une autre analyse, basée sur l’utilisateur.

// Filtrage par utilisateur
logUtilisateur = filter logM with log.message.contains ("utilisateur U")
// .. analyse du contenu de logProduit

Ou encore par tranche horaire.

// Filtrage par utilisateur
logPeriode = filter logM with log.date.between d1 and d2
// .. analyse du contenu de logPeriode

logM est une sorte de « vue » sur la collection initiale, dont la persistance évite de refaire le calcul complet à chaque analyse.

Reprise sur panne

Pour comprendre la reprise sur panne, il faut se pencher sur le second aspect des RDD: la distribution. Un RDD est une collection partitionnée (cf. chapitre Systèmes NoSQL: le partitionnement). La Fig. 18.3 montre le traitement précédent dans une perspective de distribution. Chaque RDD, persistant ou non, est composé de fragments répartis dans la grappe de serveurs.

_images/spark-failover.png

Fig. 18.3 Partitionnement et reprise sur panne dans Spark.

Si une panne affecte un calcul s’appuyant sur un fragment F de RDD persistant (par exemple la transformation notée T et marquée par une croix rouge sur la figure), il suffit de le relancer à partir de F. Le gain en temps est considérable!

La panne la plus sévère affecte un fragment de RDD persistant (par exemple celui marqué par une croix violette). Dans ce cas, Spark a mémorisé la chaîne de traitement ayant constitué le RDD, et il suffit de ré-appliquer cette chaîne en remontant jusqu’aux fragments qui précèdent dans le graphe des calculs.

Dans notre cas, il faut parcourir à nouveau le fichier log pour créer le fragment logn. Si les collections stockées à l’origine du calcul sont elles-mêmes partitionnées (ce qui n’est sans doute pas le cas pour un fichier log), il suffira d’accéder à la partie de la collection à l’origine des calculs menant au RDD défaillant.

En résumé, Spark exploite la capacité à reconstruire des fragments de RDD par application de la chaîne de traitement, et ce en se limitant si possible à une partie seulement des données d’origine. La reprise peut prendre du temps, mais elle évite un recalcul complet. Si tout se passe bien (pas de panne) la présence des résultats intermédiaires en mémoire RAM assure de très bonnes performances.

Dataset et Dataframe

Un RDD est une « boîte » destinée à contenir n’importe quel document, sans aucun préjugé sur la structure (ou l’absence de structure) de ce dernier. Cela rend le système très généraliste, mais empêche une manipulation fine des constituants des documents, comme par exemple le filtrage en fonction de la valeur d’un champ. C’est le programmeur de l’application qui doit fournir la fonction effectuant le filtre.

Spark propose (depuis la version 1.3, avec des améliorations en 1.6 puis 2.0) des RDD structurés dans lesquels les données sont sous forme tabulaire. Le schéma de ces données est connu. On peut alors assimiler ces RDD à des tables relationnelles. La connaissance du schéma - et éventuellement de leur type - permet à Spark de proposer des opérations plus fines, et des optimisations inspirées des techniques d’évaluation de requêtes dans les systèmes relationnels. En fait, on se ramène à une implantation distribuée du langage SQL. En interne, un avantage important de la connaissance du schéma est d’éviter de recourir à la sérialisation des objets Java (opération effectuée dans le cas des RDD pour écrire sur disque et échanger des données en réseau).

Note

Saluons au passage le mouvement progressif de ces systèmes vers une ré-assimilation des principes du relationnel (schéma, structuration des données, interrogation à la SQL, etc.), et la reconnaissance des avantages, internes et externes, d’une modélisation des données.

Ces RDDs structurés sont nommées Dataset quand le type des colonnes est connu, Dataframe sinon. Un Dataframe n’est rien d’autre qu’un Dataset contenant des lignes de type Row dont le schéma précis n’est pas connu.

Tout cela est un peu abstrait? Voici un exemple simple qui permet d’illustrer les principaux avantages des Dataset/Dataframe. Nous voulons appliquer un opérateur qui filtre les films dont le genre est « Drame ». On va exprimer le filtre (en simplifiant un peu) comme suit:

films.filter(film.getGenre() = 'Drame');

Si films est un RDD, Spark n’a aucune idée sur la structure des documents qu’il contient. Spark va donc instancier un objet Java (éventuellement en dé-sérialisant une chaîne d’octets reçue par réseau ou lue sur disque) et appeler la méthode getGenre(). Cela peut être long, et impose surtout de créer un objet pour un simple test.

Avec un Dataset ou Dataframe, le schéma est connu et Spark utilise son propre système d’encodage/décodage à la place de la sérialisation Java. De plus, dans le cas des Dataset, la valeur du champ genre peut être testée directement sans même effectuer de décodage depuis la représentation binaire.

Il est, en résumé, tout à fait préférable d’utiliser les Dataset dès que l’on a affaire à des données structurées.

S2: Spark en pratique

Il est temps de passer à l’action. Nous allons commencer par montrer comment effectuer des transformations sur des données non-structurées avec des RDD standard.

Important

Les exemples qui suivent sont en langage Scala. Ce n’est pas pour le plaisir d’introduire un nouveau langage que vous ne connaissez (sans doute) pas. Il se trouve que Scala est un langage fonctionnel, doté d’un système d’inférence de types puissant, ce qui le rend particulièrement approprié pour exprimer des chaînes de traitements sous la forme d’une séquence d’appels de fonctions. Scala est entièrement compatible avec Java, mais beaucoup, beaucoup moins verbeux, comme le montreront les exemples qui suivent. Les commentaires devraient vous permettre de vous familiariser progressivement avec le langage. La documentation officielle est disponible en anglais seulement.

Pour tout ce qui suit, il faut d’abord lancer l’interpréteur de commandes qui se trouve dans spark/bin, et donc en principe accessible dans votre PATH des chemins d’accès aux fichiers exécutables si vous avez effectués les quelques opérations post-installation nécessaires..

spark-shell

Note

Comme Pig, l’interpréteur de Spark affiche de nombreux messages à la console ce qui est perturbant. Pour s’en débarasser:

  • copiez le fichier sparkdir/conf/log4j.properties.template en sparkdir/conf/log4j.properties;
  • éditez log4j.properties et remplacez dans la première ligne le niveau INFO par ERROR.

Ceci en supposant que les choses n’ont pas changé entre votre version et la mienne. Sinon, cherchez sur le web.

Transformations et actions

Vous pouvez récupérer le fichier http://b3d.bdpedia.fr/files/loups.txt pour faire un essai (il est temps de savoir à quoi s’en tenir à propos de ces loups et de ces moutons!), sinon n’importe quel fichier texte fait l’affaire. Copiez-collez les commandes ci-dessous. Les commandes sont précédées de scala>, elles sont parfois suivies du résultat de leur exécution dans le shell spark.

scala> val loupsEtMoutons = sc.textFile("loups.txt")
loupsEtMoutons: org.apache.spark.rdd.RDD[String] = loups.txt

Nous avons créé un premier RDD! Spark propose des actions directement applicable à un RDD et produisant des résultats scalaires. (Un RDD est interfacé comme un objet auquel nous pouvons appliquer des méthodes.)

scala> loupsEtMoutons.count() // Nombre de documents dans ce RDD
  res0: Long = 126

scala> loupsEtMoutons.first() // Premier document du RDD
  res1: String = Le loup est dans la bergerie.

scala> loupsEtMoutons.collect() // Récupération du RDD complet

Note

Petite astuce: en entrant le nom de l’objet (loupsEtMoutons.) suivi de la touche TAB, l’interpréteur Scala vous affiche la liste des méthodes disponibles.

Passons aux transformations. Elles prennent un (ou deux) RDD en entrée, produisent un RDD en sortie. On peut sélectionner les documents qui contiennent « bergerie ».

scala> val bergerie = loupsEtMoutons.filter(line => line.contains("bergerie"))

Nous utilisons pour cela la fonction contains() qui renvoie True or False selon que la chaîne (ici, la ligne) contient le motif (ici, « bergerie »). Remarquez aussi la syntaxe reposant sur une fonction anonyme comme paramètre de la fonction filter().

Nous avons créé un second RDD. Notez que les RDD sont l’équivalent des collections dans Pig, et que nous sommes en train de définir une chaîne de traitement qui part ici d’un fichier texte et applique des transformations successives.

À ce stade, rien n’est calculé, on s’est contenté de déclarer les étapes. Dès que l’on déclenche une action, comme par exemple l’affichage du contenu d’un RDD (avec collect()), Spark va déclencher l’exécution.

scala> bergerie.collect()
 res9: Array[String] = Array(Le loup est dans la bergerie.,
                 Les moutons sont dans la bergerie.,
                 Un loup a mangé un mouton, les autres loups sont restés dans la bergerie.)

On peut combiner une transformation et une action. En fait, avec Scala, on peut chaîner les opérations et ainsi définir très concisément le workflow.

scala> loupsEtMoutons.filter(line => line.contains("loup")).count()

Et pour conclure cette petite session introductive, voici comment on implante en Spark le compteur de termes dans une collection. On crée un premier RDD constitué de tous les termes:

scala> val termes = loupsEtMoutons.flatMap(line => line.split(" "))

La méthode split décompose une chaîne de caractères (ici, en prenant comme séparateur un espace). Notez l’opérateur flatMap qui produit plusieurs documents (ici un terme) pour un document en entrée (ici une ligne).

On introduit la notion de comptage: chaque terme vaut 1. L’opérateur map produit un document en sortie pour chaque document en entrée. On peut s’en servir ici pour enrichir chaque terme avec son compteur initial.

scala> val termeUnit = termes.map(word => (word, 1))

Rappelons qu’à chaque étape, vous pouvez afficher le contenu du RDD avec collect(). L’étape suivante regroupe les termes et effectue la somme de leurs compteurs: c’est un opérateur reduceByKey.

scala> val compteurTermes = termeUnit.reduceByKey((a, b) => a + b)

On passe à l’opérateur une fonction de réduction, ici notée littéralement dans la syntaxe Scala (si j’ai deux valeurs, j’en fait la somme). Et voilà! On aurait pu tout exprimer en une seule fois.

scala> loupsEtMoutons.flatMap(line => line.split(" "))
                       .map(word => (word, 1))
                       .reduceByKey((a, b) => a + b)

Il reste à exécuter le traitement.

scala> loupsEtMoutons.collect()

Le résultat pourra vous sembler étrange: il manque les diverses étapes de simplification du texte qui sont de mise pour un moteur de recherche (vus dans le chapitre Recherche avec classement pour les détails). Mais l’essentiel est de comprendre l’enchaînement des opérateurs.

Finalement, si on souhaite conserver en mémoire le RDD final pour le soumettre à divers traitements, il suffit d’appeler:

scala> compteurTermes.persist()

L’interface de contrôle Spark

Spark dispose d’une interface Web qui permet de consulter les entrailles du système et de mieux comprendre ce qui est fait. Elle est accessible sur le port 4040, donc à l’URL http://localhost:4040 pour une exécution du shell. Pour explorer les informations fournies par cette interface, nous allons exécuter notre workflow, assemblé en une seule chaîne d’instructions Scala.

var compteurTermes =  sc.textFile("loups.txt")
       .flatMap(line => line.split(" "))
       .map(word => (word, 1))
       .reduceByKey((a, b) => a + b)

compteurTermes.collect()

Lancez le shell est exécutez ce workflow (lisez l’astuce qui suit au préalable).

Astuce

Si vous voulez entrer des instructions multi-lignes dans l’interpréteur Scala, utilisez la commande :paste, suivi de vos instructions, et CTRL D pour finir.

Maintenant, vous devriez pouvoir accéder à l’interface et obtenir un affichage semblable à celui de la Fig. 18.4. En particulier, le job que vous venez d’exécuter devrait apparaître, avec sa durée d’exécution et quelques autres informations.

_images/sparkUI.png

Fig. 18.4 L’interface Web de Spark

L’onglet jobs

Cliquez sur le nom du job pour obtenir des détails sur les étapes du calcul (Fig. 18.5). Spark nous dit que l’exécution s’est faite en deux étapes. La première comprend les transformations textFile, flatMap et `` map``, la seconde la transformation `` reduceByKey``. les deux étapes sont séparées par une phase de shuffle.

_images/sparkQueryPlan.png

Fig. 18.5 Plan d’exécution d’un job Spark: les étapes.

À quoi correspondent ces étapes? En fait, si vous avez bien suivi ce qui précède dans le cours, vous avez les éléments pour répondre: une étape dans Spark regroupe un ensemble d’opérations qu’il est possible d’exécuter localement, sur une seule machine, sans avoir à efectuer des échanges réseau. C’est une généralisation de la phase de Map dans un environnement MapReduce. Les étapes sont logiquement séparées par des phases de shuffle qui consistent à redistribuer les données afin de les regrouper selon certains critères. Relisez le chapitre Calcul distribué: Hadoop et MapReduce pour revoir vos bases du calcul distribué si ce n’est pas clair.

Quand le traitement s’effectue sur des données partitionnées, une étape est effectuée en parallèle sur les fragments, et Spark appelle tâche l’exécution de l’étape sur un fragment particulier, pour une machine particulière. Résumons:

  • Un job est l’exécution d’une chaîne de traitements (workflow) dans une environnement distribué.
  • Un job est découpé en étapes, chqaue étape étant un segment du workflow qui peut s’exécuter localement.
  • L’exécution d’une étape se fait par un ensemble de tâches, une par machine hébergeant un fragment du RDD servant de point d’entrée à l’étape.

Et voilà ! Si c’est clair passez à la suite, sinon relisez.

L’onglet Stages

Vous pouvez obtenir des informations complémentaires sur chaque étape avec l’onglet Stages (qui veut dire étapes, en anglais). En particulier, l’interface montre de nombreuses statistiques sur le temps d’exécution, le volume des données échangées, etc. Tout cela est très précieux quand on veut vérifier que tout va bien pour des traitements qui durent des heures ou des jours.

L’onglet Storage

Maintenant, consultez l’onglet Storage. Il devrait être vide et c’est normal: aucun job n’est en cours d’exécution. Notre fichier de départ est trop petit pour que la durée d’exécution soit significative. Mais entrez la commande suivante:

compteurTermes.persist()

Et exécutez à nouveau l’action collect(). Cette fois un RDD devrait apparaître dans l’onglet Storage, et de plus vous devriez comprendre pourquoi!

Exécutez une nouvelle fois l’action collect() et consultez les statistiques des temps d’exécution. La dernière exécution devrait être significativement plus rapide que les précédentes. Comprenez-vous pourquoi? Regardez les étapes, et clarifiez tout cela dans votre esprit.

Il ne s’agit que d’un fichier de 4 lignes en entrée. On peut extrapoler à de très grandes collections et réaliser le gain potentiel avec cette méthode (qui n’est pas magique: on a échangé du temps contre de l’espace, comme toujours).

Exercices

Exercice: à vous de jouer

Vous vous doutez de ce qu’il faut faire à ce stade: reproduire les commandes qui précèdent, et explorer l’interface de Spark jusqu’à ce que tout soit clair.

S3: Traitement de données structurées avec Cassandra et Spark

Voyons maintenant les outils de traitement proposés par Spark sur des données structurées issus, par exemple, d’une base de données, ou de collections de documents JSON. On interagit dans ce cas avec une version évoluée des RDD, les Dataframes et les Datasets. Les deux structures sont semblables à des tables relationnelles, mais la seconde est, de plus, fortement typée puisqu’on connaît le type de chaque colonne. Cela simplifie considérablement les traitements, aussi bien du point de vue du concepteur des traitements que de celui du système.

  • Pour le concepteur, la possibilité de référencer des champs et de leur appliquer des opérations standard en fonction de leur type évite d’avoir à écrire une fonction spécifique pour la moindre opération, rend le code beaucoup lisible et concis.
  • Pour le système, la connaissance du schéma facilite les contrôles avant exécution (compile-time checking, par opposition au run-time checking), et permet une sérialisation très rapide, indépendante de la sérialisation Java, grâce à une couche composée d” encoders.

Nous allons en profiter pour instancier un début d’architecture réaliste en associant Spark à Cassandra comme source de données. Dans une telle organisation, le stockage et le partitionnement sont assurés par Cassandra, et le calcul distribué par Spark. Idéalement, chaque nœud Spark traite un ou plusieurs fragments d’une collection partitionnée Cassandra, et communique donc avec un des nœuds de la grappe Cassandra. On obtient alors un système complètement distribué et donc scalable.

Préliminaires

La base Cassandra que nous prenons comme support est celle des restaurants New-Yorkais. Reportez-vous au chapitre Cassandra - Travaux Pratiques pour la création de cette base. Dans ce qui suit, on suppose que le serveur Cassandra est en écoute sur la machine 192.168.99.100, port 32769 (si vous utilisez Cassandra avec Docker, reportez-vous aussi aux manipulations vues en TP pour trouver les bonnes valeurs d’IP et de port, qui sont probablement différentes de celles-ci).

Pour associer Spark et Cassandra, il faut récupérer le connecteur sur la page https://spark-packages.org/package/datastax/spark-cassandra-connector. Prenez la version la plus récente, en tout cas celle correspondant à votre version de Spark.

Vous obtenez un fichier jar. Pour qu’il soit pris en compte, le plus simple est de le copier dans le répertoire jars de Spark. Lancez alors le shell Spark. Il ne reste plus qu’à se connecter au serveur Cassandra en ajoutant la configuration (machine et port) dans le contexte Spark. Exécutez donc au préalable les commandes suivantes (en remplaçant la machine et le port par vos propres valeurs, bien sûr).

import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf

// Paramètres de connexion
spark.setCassandraConf("default",
                   CassandraConnectorConf.ConnectionHostParam.option("192.168.99.100")
                ++ CassandraConnectorConf.ConnectionPortParam.option(32769))

Vous devriez pouvoir vérifier que la connexion fonctionne en interrogeant la table des restaurants.

val restaurants_df = spark.read.cassandraFormat("restaurant", "resto_ny").load()

restaurants_df.printSchema()

restaurants_df.show()

Note

Il semble que le nom du Keyspace et de la table doivent être mis en minuscules.

Nous voici en présence d’un Dataframe Spark, dont le schéma (noms des colonnes) a été directement obtenu depuis Cassandra. En revanche, les colonnes ne sont pas typées (on pourrait espérer que le type est récupéré et transcrit depuis le schéma de Cassandra, mais ce n’est malheureusement pas le cas).

Pour obtenir un Dataset dont les colonnes sont typées, avec tous les avantages qui en résultent, il faut définir une classe dans le langage de programmation (ici, Scala) et demander la conversion, comme suit:

case class Restaurant(id: Integer, Name: String, borough: String,
                       BuildingNum: String, Street: String,
                       ZipCode: Integer, Phone: String, CuisineType: String)

val restaurants_ds = restaurants_df.as[Restaurant]

Nous avons donc maintenant un Dataframe restaurant_df et un Dataset restaurant_ds. Le premier est une collection d’objets de type Row, le second une collection d’objets de type Restaurant. On peut donc exprimer des opérations plus précises sur le second. Notons que tout cela constitue une illustration pratique du compromis que nous étudions depuis le début de ce cours sur la notion de document: vaut-il mieux des données au schéma très contraint, mais offrant plus de sécurité, ou des données au schéma très flexible, mais beaucoup plus difficile à manipuler?

Nous aurons également besoin des données sur les inspections de ces restaurants.

case class Inspection (idRestaurant: Integer, InspectionDate: String, ViolationCode: String,
      ViolationDescription: String, CriticalFlag: String, Score: Integer, Grade: String)

val inspections_ds = spark.read.cassandraFormat("inspection", "resto_ny").load().as[Inspection]

Note

Pour celles/ceux qui veulent expérimenter directement l’interface SQL de Spark, il existe une troisème option, celle de créer une « vue » sur les restaurants Cassandra avec la commande suivante:

val createDDL = """CREATE TEMPORARY VIEW restaurants_sql
            USING org.apache.spark.sql.cassandra
            OPTIONS (
             table "restaurant",
             keyspace "resto_ny")"""
spark.sql(createDDL)

spark.sql("SELECT * FROM restaurants_sql").show

Traitements basés sur les Datasets

Nous allons illustrer l’interface de manipulation des Datasets (elle s’applique aussi au Dataframes, à ceci près qu’on ne peut pas exploiter le typage précis donné par la classe des objets contenus dans la collection). Pour bien saisir la puissance de cette interface, vous êtes invités à réfléchir à ce qu’il faudrait faire pour obtenir un résultat équivalent si on avait affaire à un simple RDD, sans schéma, avec donc la nécessité d’écrire une fonction à chaque étape.

Commençons par les projections (malencontreusement référencées par la mot-clé select depuis les débuts de SQL) consistant à ne conserver que certaines colonnes. La commande suivante ne conserve que trois colonnes.

val restaus_simples = restaurants_ds.select("name", "phone", "cuisinetype")

restaus_simples.show()

Voici maintenant comment on effectue une sélection (avec le mot-clé filter, correspondant au where de SQL).

val manhattan = restaurants_df.filter("borough =  'MANHATTAN'")

manhattan.show()

Par la suite, nous omettons l’appel à show() que vous pouvez ajouter si vous souhaitez consulter le résultat.

L’interface Dataset offre une syntaxe légèrement différente qui permet de tirer parti du fait que l’on a affaire à une collection d’objets de type Restaurant. On peut donc passer en paramètre une expression booléenne Scala qui prend un object Restaurant en entrée et renvoie un Booléen.

val r = restaurants_ds.filter(r => r.borough == "MANHATTAN")

Ce type de construction permet un typage statique (au moment de la compilation) qui garantit qu’il n’y aura pas de problème au moment de l’exécution.

On peut effectuer des agrégats, comme par exemple le regroupement des restaurants par arrondissement (borough):

val comptage_par_borough = restaurants.groupBy("borough").count()

Tout cela aurait aussi bien pu s’exprimer en CQL (voir exercices). Mais Spark va définitivement plus loin en termes de capacité de traitements, et propose notamment la fameuse opération de jointure qui nous a tant manqué jusqu’ici.

val restaus_inspections = restaurants_ds
       .join(inspections_ds, restaurants_ds("id") === inspections_ds("idRestaurant"))

Le traitement suivant effectue la moyenne des votes pour les restaurants de Tapas.

val restaus_stats = restaurants_ds.filter("cuisinetype > 'Tapas'")
      .join(inspections_ds, restaurants_ds("id") === inspections_ds("idRestaurant"))
      .groupBy(restaurants_ds("name"))
     .agg(avg(inspections_ds("score")))

Exercices

Exercice: qu’est-il arrivé à CQL?

Vous avez sans doute noté que Spark surpasse CQL. On peut donc envisager de se passer de ce dernier, ce qui soulève quand même un inconvénient majeur (lequel?). Le connecteur Spark/Cassandra permet de déléguer les transformations Spark compatibles avec CQL grâce à un paramètre pushdown qui est activé par défaut.

  • Enoncez clairement l’inconvénient d’utiliser Spark en remplacement de CQL.
  • Etudiez le rôle et fonctionnement de l’option pushdown dans la documentation du connecteur.
  • Quelles sont les requêtes parmi celles vues ci-dessus qui peuvent être transmises à CQL.

Exercice: plans d’exécution

Avec l’interface de Spark vous pouvez consulter le graphe d’exécution de chaque traitement. Comme nous sommes passés avec l’API des Dataframe à un niveau beaucoup plus déclaratif, cela vaut la peine de regarder, pour chaque traitement effectué (et notamment la jointure) comment Spark évalue le résultat avec des opérateurs distribués.

Exercice: exploration de l’interface Dataset

L’API des Datasets est présentée ici:

Etudiez et exprérimentez les transformations et actions décrites.

Quiz

  • Quelle est la différence entre une transformation et une action?
  • Comment expliqueriez-vous la notion d’exécution « paresseuse » dans Spark?