Qu’est-ce que Spark ?
Apache Spark est un framework de traitements Big Data open source construit pour effectuer des analyses sophistiquées et conçu pour la rapidité et la facilité d’utilisation. Celui-ci a originellement été développé par AMPLab, de l’Université UC Berkeley, en 2009 et passé open source sous forme de projet Apache en 2010.
Spark présente plusieurs avantages par rapport aux autres technologies big data et MapReduce comme Hadoop et Storm. D’abord, Spark propose un framework complet et unifié pour répondre aux besoins de traitements Big Data pour divers jeux de données, divers par leur nature (texte, graphe, etc.) aussi bien que par le type de source (batch ou flux temps-réel). Ensuite, Spark permet à des applications sur clusters Hadoop d’être exécutées jusqu’à 100 fois plus vite en mémoire, 10 fois plus vite sur disque. Il vous permet d’écrire rapidement des applications en Java, Scala ou Python et inclut un jeu de plus de 80 opérateurs haut-niveau. De plus, il est possible de l’utiliser de façon interactive pour requêter les données depuis un shell.
Enfin, en plus des opérations de Map et Reduce, Spark supporte les requêtes SQL et le streaming de données et propose des fonctionnalités de machine learning et de traitements orientés graphe. Les développeurs peuvent utiliser ces possibilités en stand-alone ou en les combinant en une chaîne de traitement complexe.
Ce premier article de la série propose un aperçu de ce qu’est Spark, de la suite d’outils mis à disposition pour les traitements Big Data et explique comment Spark se positionne par rapport aux solutions classiques de MapReduce.
Hadoop et Spark
Hadoop est positionné en tant que technologie de traitement de données depuis 10 ans et a prouvé être la solution de choix pour le traitement de gros volumes de données. MapReduce est une très bonne solution pour les traitements à passe unique mais n’est pas la plus efficace pour les cas d’utilisation nécessitant des traitements et algorithmes à plusieurs passes. Chaque étape d’un workflow de traitement étant constituée d’une phase de Map et d’une phase de Reduce, il est nécessaire d’exprimer tous les cas d’utilisation sous forme de patterns MapReduce pour tirer profit de cette solution. Les données en sortie de l’exécution de chaque étape doivent être stockées sur système de fichier distribué avant que l’étape suivante commence. Cette approche a tendance à être peu rapide à cause de la réplication et du stockage sur disque.
De plus, les solutions Hadoop s’appuient généralement sur des clusters, qui sont difficiles à mettre en place et à administrer. Elles nécessitent aussi l’intégration de plusieurs outils pour les différents cas d’utilisation big data (comme Mahout pour le Machine Learning et Storm pour le traitement par flux).
Si vous souhaitez mettre en place quelque chose de plus complexe, vous devrez enchaîner une série de jobs MapReduce et les exécuter séquentiellement, chacun de ces jobs présentant une latence élevée et aucun ne pouvant commencer avant que le précédent n’ait tout-à-fait terminé.
Spark permet de développer des pipelines de traitement de données complexes, à plusieurs étapes, en s’appuyant sur des graphes orientés acycliques (DAG). Spark permet de partager les données en mémoire entre les graphes, de façon à ce que plusieurs jobs puissent travailler sur le même jeu de données. Spark s’exécute sur des infrastructures Hadoop Distributed File System (HDFS) et propose des fonctionnalités supplémentaires. Il est possible de déployer des applications Spark sur un cluster Hadoop v1 existant (avec SIMR – Spark-Inside-MapReduce), sur un cluster Hadoop v2 YARN ou même sur Apache Mesos. Plutôt que de voir en Spark un remplaçant d’Hadoop, il est plus correct de le voir comme une alternative au MapReduce d’Hadoop. Spark n’a pas été prévu pour remplacer Hadoop mais pour mettre à disposition une solution complète et unifiée permettant de prendre en charge différents cas d’utilisation et besoins dans le cadre des traitements big data.
Les fonctionnalités de Spark
Spark apporte des améliorations à MapReduce grâce à des étapes de shuffle moins coûteuses. Avec le stockage en mémoire et un traitement proche du temps-réel, la performance peut être plusieurs fois plus rapide que d’autres technologies big data. Spark supporte également les évaluations paresseuses ("lazy evaluation") des requêtes, ce qui aide à l’optimisation des étapes de traitement. Il propose une API de haut-niveau pour une meilleure productivité et un modèle d’architecture cohérent pour les solutions big data.
Spark maintient les résultats intermédiaires en mémoire plutôt que sur disque, ce qui est très utile en particulier lorsqu’il est nécessaire de travailler à plusieurs reprises sur le même jeu de données. Le moteur d’exécution est conçu pour travailler aussi bien en mémoire que sur disque. Les opérateurs réalisent des opérations externes lorsque la donnée ne tient pas en mémoire, ce qui permet de traiter des jeux de données plus volumineux que la mémoire agrégée d’un cluster. Spark essaye de stocker le plus possible en mémoire avant de basculer sur disque. Il est capable de travailler avec une partie des données en mémoire, une autre sur disque.
Il est nécessaire d’examiner ses données et ses cas d’utilisation pour évaluer ses besoins en mémoire car, en fonction du travail fait en mémoire, Spark peut présenter d’importants avantages de performance. Les autres fonctionnalités proposées par Spark comprennent :
- Des fonctions autres que Map et Reduce
- L’optimisation de graphes d’opérateurs arbitraires
- L’évaluation paresseuse des requêtes, ce qui aide à optimiser le workflow global de traitement
- Des APIs concises et cohérentes en Scala, Java et Python
- Un shell interactif pour Scala et Python (non disponible encore en Java)
Spark est écrit en Scala et s’exécute sur la machine virtuelle Java (JVM). Les langages supportés actuellement pour le développement d’applications sont :
- Scala
- Java
- Python
- Clojure
- R
L’écosystème de Spark
À côté des API principales de Spark, l’écosystème contient des librairies additionnelles qui permettent de travailler dans le domaine des analyses big data et du machine learning. Parmi ces librairies, on trouve :
- Spark Streaming : Spark Streaming peut être utilisé pour traitement temps-réel des données en flux. Il s’appuie sur un mode de traitement en "micro batch" et utilise pour les données temps-réel DStream, c’est-à-dire une série de RDD (Resilient Distributed Dataset).
- Spark SQL : Spark SQL permet d’exposer les jeux de données Spark via API JDBC et d’exécuter des requêtes de type SQL en utilisant les outils BI et de visualisation traditionnels. Spark SQL permet d’extraire, transformer et charger des données sous différents formats (JSON, Parquet, base de données) et les exposer pour des requêtes ad-hoc.
- Spark MLlib : MLlib est une librarie de machine learning qui contient tous les algorithmes et utilitaires d’apprentissage classiques, comme la classification, la régression, le clustering, le filtrage collaboratif, la réduction de dimensions, en plus des primitives d’optimisation sous-jacentes.
- Spark GraphX : GraphX est la nouvelle API (en version alpha) pour les traitements de graphes et de parallélisation de graphes. GraphX étend les RDD de Spark en introduisant le Resilient Distributed Dataset Graph, un multi-graphe orienté avec des propriétés attachées aux nœuds et aux arrêtes. Pour le support de ces traitements, GraphX expose un jeu d’opérateurs de base (par exemple subgraph, joinVertices, aggregateMessages), ainsi qu’une variante optimisée de l’API Pregel. De plus, GraphX inclut une collection toujours plus importante d’algorithmes et de builders pour simplifier les tâches d’analyse de graphes.
En plus de ces librairies, on peut citer BlinkDB et Tachyon : BlinkDB est un moteur de requêtes approximatif qui peut être utilisé pour exécuter des requêtes SQL interactives sur des volumes de données importants. Il permet à l’utilisateur de troquer la précision contre le temps de réponse. Il fonctionne en exécutant les requêtes sur des extraits des données et présente ses résultats accompagnés d’annotations avec les indicateurs d’erreurs significatifs. Tachyon est un système de fichiers distribué qui permet de partager des fichiers de façon fiable à la vitesse des accès en mémoire à travers des frameworks de clusters comme Spark et MapReduce. Il évite les accès disques et le chargement des fichiers fréquemment utilisés en les cachant en mémoire. Ceci permet aux divers frameworks, tâches et requêtes d’accéder aux fichiers en cache rapidement.
Il existe aussi des adaptateurs pour intégration à d’autres produits comme Cassandra (Spark Cassandra Connector) et R (SparkR). Avec le connecteur Cassandra, vous pouvez utiliser Spark pour accéder à des données stockées dans Cassandra et réaliser des analyses sur ces données.
Le diagramme suivant (Figure 1) montre les relations entre ces différentes librairies de l’écosystème.
Figure 1. Spark Framework Libraries
Nous explorerons ces librairies dans les futurs articles de cette série.
L’architecture de Spark
L’architecture de Spark comprend les trois composants principaux suivants :
- Le stockage des données
- L’API
- Le Framework de gestion des ressources
Regardons chacun de ces composants plus en détails.
Le stockage des données :
Spark utilise le système de fichiers HDFS pour le stockage des données. Il peut fonctionner avec n’importe quelle source de données compatible avec Hadoop, dont HDFS, HBase, Cassandra, etc.
L’API :
L’API permet aux développeurs de créer des applications Spark en utilisant une API standard. L’API existe en Scala, Java et Python. Les liens ci-dessous pointent vers les sites présentant les API Spark pour chacun de ces langages :
Gestion des ressources
Spark peut être déployé comme un serveur autonome ou sur un framework de traitements distribués comme Mesos ou YARN. La figure 2 illustre les composants du modèle d’architecture de Spark.
Figure 2. Spark Architecture
Les "Resilient Distributed Datasets"
Les Resilient Distributed Datasets (basés sur la publication de recherche de Matei), ou RDD, sont un concept au cœur du framework Spark. Vous pouvez voir un RDD comme une table dans une base de données. Celui-ci peut porter tout type de données et est stocké par Spark sur différentes partitions. Les RDD permettent de réarranger les calculs et d’optimiser le traitement. Ils sont aussi tolérants aux pannes car un RDD sait comment recréer et recalculer son ensemble de données. Les RDD sont immutables. Pour obtenir une modification d’un RDD, il faut y appliquer une transformation, qui retournera un nouveau RDD, l’original restera inchangé. Les RDD supportent deux types d’opérations :
- Les transformations
- Les actions
Les transformations : les transformations ne retournent pas de valeur seule, elles retournent un nouveau RDD. Rien n’est évalué lorsque l’on fait appel à une fonction de transformation, cette fonction prend juste un RDD et retourne un nouveau RDD. Les fonctions de transformation sont par exemple map, filter, flatMap, groupByKey, reduceByKey, aggregateByKey, pipe et coalesce
.
Les actions : les actions évaluent et retournent une nouvelle valeur. Au moment où une fonction d’action est appelée sur un objet RDD, toutes les requêtes de traitement des données sont calculées et le résultat est retourné. Les actions sont par exemple reduce, collect, count, first, take, countByKey et foreach
.
Comment installer Spark
Il y a plusieurs façons d’installer et d’utiliser Spark. Vous pouvez l’installer sur votre machine, comme framework autonome ou utiliser une des images de machine virtuelle (Spark Virtual Machine) disponibles chez des éditeurs comme Cloudera, HortonWorks ou MapR. Vous pouvez aussi utiliser Spark déjà installé et configuré, sur le Cloud (comme Databricks Cloud).
Dans cet article, Spark sera installé en framework autonome et lancé localement. La version 1.2.0, sortie récemment, sera utilisée pour la démonstration de code.
Comment exécuter Spark ?
Lorsque Spark est installé sur une machine locale ou sur le Cloud, différents modes de connexion au moteur Spark sont possibles. La table ci-dessous présente le paramètre Master URL utilisable pour les différents modes d’exécution.
Comment interagir avec Spark ?
Une fois que Spark est installé et est en cours d’exécution, vous pouvez vous y connecter en utilisant le Shell Spark pour effectuer des analyses de données interactives. Le Shell Spark est disponible pour les langages Scala et Python. Java ne supporte pas encore de Shell interactif, donc cette fonctionnalité n’est pas encore disponible pour Java. Vous pouvez utiliser les commandes spark-shell.cmd
et pyspark.cmd
pour lancer respectivement le Shell avec Scala et Python.
La console Web
Quand Spark est en cours d’exécution, quel que soit le mode d’exécution, vous pouvez consulter les résultats des jobs et d’autres statistiques en accédant à la console Web, via l’URL suivante : http://localhost:4040
La figure 3 montre la console Spark, avec ses onglets pour les étapes, le stockage, l’environnement et les exécuteurs.
(Cliquez sur l'image pour l'agrandir)
Figure 3. Spark Web Console
Les variables partagées
Spark fournit deux types de variables partagées pour permettre d’exécuter de façon efficace les programmes Spark sur un cluster : Broadcast et Accumulators.
Les variables Broadcast : Ces variables permettent de maintenir des variables en cache, en lecture seule, sur chaque machine, plutôt que d’avoir à les envoyer avec les tâches. Elles peuvent être utilisées pour mettre à la disposition des nœuds du cluster des copies de jeux de données volumineux de façon plus efficace. L’extrait de code suivant montre comment utiliser les variables Broadcast.
// // Broadcast Variables // val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
Les accumulateurs : les accumulateurs peuvent être ajoutés lors de l’utilisation d’opérations associatives ; leur support est donc efficace dans le cadre de traitements parallèles. Ils peuvent être utilisés pour implémenter des compteurs (comme avec MapReduce) ou des sommes. Les tâches exécutées sur le cluster peuvent ajouter un accumulateur avec la méthode add. Cependant, ils ne peuvent pas lire sa valeur. Seul le programme pilote peut lire la valeur d’un accumulateur. L’extrait de code suivant montre comment utiliser un accumulateur :
// // Accumulators // val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value
Exemple d’application Spark
L’exemple couvert dans cet article est une application simple décompte de mots. C’est l’exemple classique que l’on présenterait lorsque l’on enseigne les traitements big data avec Hadoop. Nous allons effectuer des requêtes d’analyse sur un fichier texte. Le fichier texte et le jeu de données de cet exemple sont de petite taille mais les requêtes Spark sont valables pour des jeux de données beaucoup plus volumineux, sans modification de code. Pour faire simple, nous utiliserons le Shell Scala. Commençons par voir comment installer Spark sur une machine locale.
Pré-requis :
- Vous aurez besoin du Java Development Kit (JDK) pour que Spark fonctionne en local. C’est la première étape décrite plus bas.
- Vous aurez besoin d’installer les applicatifs Spark sur votre machine. Les instructions sont données dans la deuxième étape ci-dessous.
Note : Ces instructions sont valables en environnement Windows. Si vous utilisez un système d’exploitation différent, vous devrez adapter les variables système et les chemins vers les répertoires en fonction de votre environnement.
Installation du JDK : Téléchargez le JDK depuis le site d’Oracle, la version 1.7 est recommandée. Choisissez pour l’installation du JDK un répertoire sans espaces. Pour les utilisateurs de Windows, choisissez par exemple c:\dev
, pas c:\Program Files
. Vérifiez l’installation depuis le répertoire bin sous le répertoire JDK 1.7 en tapant la commande java -version
. Si l’installation est correcte, cette commande doit afficher la version de Java installée.
Installation des applicatifs Spark : Téléchargez la dernière version depuis le site de Spark. La version la plus récente au moment de l’écriture de cet article est la 1.2. Vous pouvez aussi choisir une version spécifique en fonction d’une version Hadoop. J’ai moi-même téléchargé Spark pour Hadoop 2.4 et le nom du fichier est spark-1.2.0-bin-hadoop2.4.tgz
. Décompressez le fichier dans un répertoire local, comme c:\dev.
Pour vérifier l’installation de Spark, positionnez-vous sur le répertoire de Spark et lancez le Shell avec les commandes suivantes :
c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\spark-shell
Si l’installation s’est bien passée, vous verrez les messages ci-dessous dans la console :
….
15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.2.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
….
15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
15/01/17 23:17:53 INFO SparkILoop: Created spark context..
Spark context available as sc.
Vous pouvez taper la commande suivante pour vérifier le bon fonctionnement du Shell :
sc.version
(ou)
sc.appName
Cette étape finie, vous pouvez sortir du shell :
:quit
Pour lancer le Shell python, vous devez installer Python sur votre machine. Prenez par exemple Anaconda, qui est une distribution gratuite qui inclut plusieurs packages Python pour la science, les maths, l’ingénierie et l’analyse de données. Vous pouvez ensuite exécuter les commandes suivantes :
c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\pyspark
Décompte de mots
Une fois Spark installé et en cours d’exécution, vous pouvez exécuter des requêtes d’analyse avec l’API. Des commandes simples pour lire des données depuis un fichier texte et les traiter sont disponibles. Nous examinerons des cas d’utilisation plus avancés dans les futurs articles de la série. Commençons par utiliser l’API pour exécuter l’exemple connu du décompte de mots. Ouvrez un Shell Scala, voici les commandes à utiliser :
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ val txtFile = "README.md" val txtData = sc.textFile(txtFile) txtData.cache()
La fonction cache est appelée pour stocker les RDD créés en cache, de façon à ce que Spark n’ait pas à les recalculer à chaque fois, à chaque requête suivante. Notez que cache() est une opération lazy, Spark ne stocke pas la donnée immédiatement en mémoire, en fait, ceci sera fait lorsque l’action sera invoquée sur un RDD. Maintenant, nous pouvons appeler la fonction count pour voir combien de lignes sont présentes dans le fichier texte. txtData.count()
Les commandes suivantes réalisent le décompte des mots et affichent le compte à côté de chaque mot présent dans le fichier.
val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) wcData.collect().foreach(println)
D’autres exemples d’utilisation de l’API peuvent être consultés sur le site de Spark, dans la documentation.
Pour suivre
Dans les futurs articles de la série, nous en apprendrons plus sur d’autres parties de l’écosystème de Spark, en commençant par Spark SQL. Nous regarderons Spark Streaming, Spark MLlib et Spark GraphX, ainsi que les frameworks à venir comme Tachyon et BlinkDB.
Conclusion
Dans cet article, nous avons vu comment le framework Apache Spark, avec son API standard, nous aide en matière de traitement et d’analyse de données. Nous avons aussi vu comment Spark se positionne par rapport aux implémentations MapReduce traditionnelles comme Apache Hadoop. Spark s’appuie sur le même système de stockage de fichiers qu’Hadoop, il est donc possible d’utiliser Spark et Hadoop ensemble dans le cas où des investissements significatifs ont déjà été faits avec Hadoop.
Vous pouvez aussi combiner les types de traitements Spark avec Spark SQL, Spark Machine Learning et Spark Streaming comme nous le verrons dans les prochains articles. Grâce à différents modes d’intégration et adaptateurs Spark, vous pouvez combiner Spark avec d’autres technologies. Vous pouvez par exemple utiliser ensemble Spark, Kafka et Apache Cassandra ; Kafka pour le streaming de données entrantes, Spark pour le traitement et la base NoSQL Cassandra pour le stockage des résultats.
Gardez cependant à l’esprit que Spark n’est pas un écosystème encore complètement mature et que celui-ci nécessite des améliorations dans certains domaines comme la sécurité ou l’intégration avec des outils de BI.
Références
- Site principal de Spark
- Exemples
- Vidéos et présentations de la conférence Spark Summit 2014
- Site de Spark on Databricks
Au sujet de l’Auteur
Srini Penchikala travaille actuellement en tant qu’architecte applicatif au sein d’une société de services financiers à Austin au Texas. Il a plus de 20 ans d’expérience en architecture, conception et développement. Srini écrit actuellement un livre sur les patterns dans le contexte des bases NoSQL. Il est aussi le co-auteur de "Spring Roo in Action", aux éditions Manning. Il a été présentateur lors de plusieurs conférences, comme JavaOne, SEI Architecture Technology Conference (SATURN), IT Architect Conference (ITARC), No Fluff Just Stuff, NoSQL Now et Project World Conference. Srini a publié également de nombreux articles sur l’architecture, la sécurité et la gestion des risques et les bases NoSQL sur divers sites, tels qu’InfoQ, The ServerSide, OReilly Network (ONJava), DevX Java, java.net et JavaWorld. Il est Lead Editor de la communauté bases de données NoSQL chez InfoQ.