Introduction
Dans cet article, nous allons voir comment il est possible d'utiliser Cassandra et Spark, pour effectuer des opérations sur une grande quantité de données, le tout de manière distribuée.
Cassandra
Cassandra est une base de données NoSQL orientée colonnes, à l'origine développée par Facebook pour ses besoins en interne. Je vous invite à consulter les excellents articles de Duy Hai Doan qui présente ses fonctionnalités et son modèle de stockage de manière très exhaustive.
Spark
Spark est un moteur d’analyse adapté au traitement rapide de gros volumes de données. Spark effectue ces traitements sur des abstractions, appelés RDD (Resilient Distributed Dataset) qui peuvent répresenter différentes sources de données (une variable classique, un fichier texte local, une fichier sur HDFS, etc.). Spark est devenu un "Apache Top-Level Project" en février 2014, après avoir été accepté dans le programme d'incubation d'Apache en juin 2013.
Datastax
Datastax est une société basée à Santa Clara en Californie fournissant des solutions basées sur Cassandra. Ils ont récemment dévéloppé un connecteur permettant d'exposer des tables Cassandra en tant que source de données résiliente pour Spark.
Cas pratique
A titre d'exemple, nous allons utiliser des données représentant un ensemble de trajets reliant des villes américaines et effectuer des opérations impliquant ces trajets. Chaque trajet est caractérisé par :
- une ville d'origine
- une ville de destination
- la distance entre les 2 villes
- le temps de parcours
- etc.
Pour chaque ville d'origine, nous allons calculer la moyenne des distances des trajets partant de cette même ville.
Voici les différentes étapes qui nous permettront d'y parvenir :
- Insertion des informations relatives aux différents trajets (fichier csv compressé accessible ici) dans une base de données Cassandra.
- Calcul du nombre de trajets, groupés par ville d'origine.
- Calcul de la moyenne des distances des trajets, toujours groupés par ville d'origine.
Tout le code présenté dans cet article sera en Java.
Prérequis
Pour exécuter le programme détaillé par la suite, vous aurez besoin de :
- JDK 7 ou supérieur
- Git
- Maven
Stockage des trajets
Nous allons insérer les trajets dans une base Cassandra à partir d'un fichier CSV. Le code CQL permettant de créer la table est le suivant :
CREATE TABLE RoadTrip ( id int PRIMARY KEY, origin_city_name varchar, origin_state_abr varchar, destination_city_name varchar, destination_state_abr varchar, elapsed_time int, distance int );
La classe Java qui contiendra les informations de chaque trajet est la suivante :
public class RoadTrip { private Integer id = null; private String originCityName = null; private String originStateAbr = null; private String destinationCityName = null; private String destinationStateAbr = null; private Integer elapsedTime = null; private Integer distance = null; ... }
Les objets RoadTrip seront stockés dans Cassandra à l'aide du driver Datastax pour Cassandra. Afin de réduire le nombre d'accès à la base, nous allons utiliser une requête de type BatchStatement
:
Insert insertStatement = QueryBuilder.insertInto("RoadTrip"); insertStatement.value("id", QueryBuilder.bindMarker()) .value("origin_city_name", QueryBuilder.bindMarker()) .value("origin_state_abr", QueryBuilder.bindMarker()) .value("destination_city_name", QueryBuilder.bindMarker()) .value("destination_state_abr", QueryBuilder.bindMarker()) .value("elapsed_time", QueryBuilder.bindMarker()) .value("distance", QueryBuilder.bindMarker()) ; PreparedStatement ps = session.prepare(insertStatement.toString()); ... BatchStatement batch = new BatchStatement(); for (RoadTrip roadtrip : roadtrips) { batch.add(ps.bind(roadtrip.getId(), roadtrip.getOriginCityName(), roadtrip.getOriginStateAbr(), roadtrip.getDestinationCityName(), roadtrip.getDestinationStateAbr(), roadtrip.getElapsedTime(), roadtrip.getDistance() )); } session.execute(batch); ...
Initialisation du Contexte Spark
Maintenant que les trajets sont stockés dans une table Cassandra, nous allons instancier un contexte Spark et lui indiquer comment accéder à Cassandra :
SparkConf conf = new SparkConf(true) .setMaster("local") .setAppName("DatastaxTests") .set("spark.executor.memory", "1g") .set("spark.cassandra.connection.host", "localhost") .set("spark.cassandra.connection.native.port", "9142") .set("spark.cassandra.connection.rpc.port", "9171"); SparkContext ctx = new SparkContext(conf);
Quelques précisions concernant les paramètres spécifiés :
- Le paramètre
master
permet de spécifier l'url du cluster Spark. La string 'local' indique que l'on souhaite utiliser Spark en mode local, c'est à dire embarqué dans la machine virtuelle. - Le paramètre
appName
est le nom logique de l'application, qui apparaîtra dans l'interface d'administration Spark (accessible par défaut à l'adresse http://localhost:4040). - Les autres paramètres sont des propriétés permettant d'indiquer à Spark la quantité de mémoire à allouer pour le processus d'exécution, ainsi que l'adresse et les ports d'écoute de Cassandra.
Nous allons ensuite directement créer notre RDD en spécifiant quels keyspace/table Cassandra contiendront les trajets routiers :
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ctx); JavaRDD<CassandraRow> rdd = functions.cassandraTable("roadtrips", "roadtrip").toJavaRDD();
Déclenchement des opérations
Spark permet d'effectuer des opérations qui peuvent être de deux types :
- Transformations : map, reduce, join, etc.
- Actions : collect, take, etc.
Vous pouvez consulter la liste des opérations implémentées ici. Des exemples pratiques (en Scala) sont aussi consultables ici.
Les opérations effectuées par Spark sont lazy, dans le sens où elles ne sont exécutées seulement au moment où l'on souhaite récupérer le resultat de ces opérations en exécutant une action (collect(), take()).
De plus, si le résultat d'une opération intermédiaire est utile pour plusieurs ensembles d'opérations, il est possible de mettre en cache (et donc de calculer, sans récupérer) ce résultat intermédiaire afin qu'il ne soit pas calculé plusieurs fois.
Cette mise en cache est effectuée à l'aide de la fonction cache()
persistant en mémoire (par défaut) l'abstraction RDD intermédiaire correspondante.
Dans le cas présent, les informations concernant l'ensemble des trajets sera utilisée deux fois, nous allons donc mettre en cache le RDD précédent :
rdd.cache();
Dans un premier temps, nous allons calculer le nombre de trajets, pour chaque ville d'origine :
JavaPairRDD<String, Integer> sizes = rdd.groupBy( new Function<CassandraRow, String>() { @Override public String call(CassandraRow row) throws Exception { return row.getString("origin_city_name"); } }).mapToPair( new PairFunction<Tuple2<String,Iterable<CassandraRow>>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Iterable<CassandraRow>> t) throws Exception { return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2 ()).size()); } }); sizes.cache();
Premièrement, à l'aide de la transformation groupBy
, nous allons obtenir une liste de tuples de type Tuple2 <String, Iterable<CassandraRow>>. Le premier élément du tuple est une String représentant une ville d'origine, le second élément est un Iterable de CassandraRow représentant l'ensemble des trajets partageant cette même ville d'origine.
La seconde transformation mapToPair permet de transformer chaque tuple précédent (de type Tuple2 <String, Iterable<CassandraRow>>) en tuple de type Tuple2<String, Integer>. Le second élément du tuple est ici un entier, représentant le nombre de trajets pour chaque ville d'origine.
Vous constaterez ici que nous mettons une nouvelle fois en cache ce résultat intermédiaire. Nous aurons en effet besoin du nombre de trajets, groupés par ville d'origine, pour calculer la moyenne des distances des trajets pour chacune de ces villes.
A cette étape, voici la liste des associations (ville d'origine, nombre de trajets) que nous venons de calculer :
Nb RoadTrips by origin Albuquerque : 61 Raleigh/Durham : 62 Memphis : 24 Seattle : 31 Orlando : 154 Salt Lake City : 31 Newark : 61 Hartford : 31 Miami : 773 San Antonio : 176 New York : 978 Omaha : 57 Portland : 9 San Jose : 57 Austin : 194 Charlotte : 31 Kansas City : 93 Chicago : 1108 Fort Lauderdale : 31 Dayton : 31 San Francisco : 362 Tulsa : 62 Los Angeles : 957 Atlanta : 31 Indianapolis : 1 Fayetteville : 31 Wichita : 62 Columbus : 31 Washington : 358 St. Louis : 204 Kahului : 93 El Paso : 31 Oklahoma City : 31 Ontario : 36 Phoenix : 124 Santa Ana : 33 Baltimore : 27 Burbank : 8 Kona : 31 Las Vegas : 93 Norfolk : 50 Philadelphia : 8 Minneapolis : 30 Houston : 58 Lihue : 42 Palm Springs : 31 Honolulu : 164 San Juan : 62 Louisville : 1 Tampa : 124 Fort Myers : 31 Colorado Springs : 31 San Diego : 159 Boston : 212 Mission/McAllen/Edinburg : 30 West Palm Beach/Palm Beach : 62 Dallas/Fort Worth : 2275 Charlotte Amalie : 31
Le calcul de la moyenne des distances sera effectué en deux étapes. La première étape effectue deux transformations :
JavaPairRDD<String, Integer> sums = rdd.mapToPair( new PairFunction<CassandraRow, String, Integer>() { @Override public Tuple2<String, Integer> call(CassandraRow row) throws Exception { return new Tuple2(row.getString("origin_city_name"), row.getInt("distance")); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer d1, Integer d2) throws Exception { return Integer.valueOf(d1.intValue()+d2.intValue()); } });
Ces transformations consistent à :
- Ré-utiliser la première abstraction que nous avons mis en cache afin d'associer, pour chaque trajet, la ville d'origine avec la distance du trajet. Ceci est possible grâce à la transformation mapTopPair.
- Calculer la somme des distances des trajets, le tout groupé par ville d'origine, grâce à la transformation reduceByKey. En plus d'associer la liste des distances à chaque ville d'origine, cette transformation va appliquer une fonction (ici une somme) entre tous les éléments de la liste.
L'abstraction sums
contiendra ainsi un ensemble de tuples associant une ville d'origine à la somme des distances des trajets partant de cette ville.
Finalement, le calcul de la moyenne se fait en divisant, pour chaque ville d'origine, la somme des distances calculée précédemment aux nombres de trajets partant de cette même ville.
Il est ainsi nécessaire de faire correspondre, d'une façon ou d'une autre, les sommes en question groupées par ville d'origine, aux nombres de trajets correspondants, représentés par l'abstraction RDD sizes
, qui a d'ailleurs été mise en cache.
L'opération permettant cette correspondance est la transformation join
:
List<Tuple2<String,Double>> averageResults = sums.join(sizes) .mapValues(new Function<Tuple2<Integer,Integer>, Double>() { @Override public Double call(Tuple2<Integer, Integer> tuple) throws Exception { return Double.valueOf((double)tuple._1() / tuple._2()); } }).collect();
Ici, les tuples (ville d'origine, somme des distances) de la première abstraction sums
seront joints aux tuples (ville d'origine, nombre de trajets) de la seconde abstraction afin de former une abstraction RDD de type JavaPairRDD<String, Tuple2<Integer, Integer>>. Finalement, il suffit d'utiliser la transformation mapValues afin de calculer la moyenne à partir du tuple (Tuple2<Integer, Integer>> - (somme des distances, nombre de trajets)) constituant le deuxième élément de l'abstraction renvoyée par join
.
L'action collect() permet de récupérer les résultats :
Average distance by origin Albuquerque : 569.0 Raleigh/Durham : 880.5 Memphis : 432.0 Seattle : 2428.8387096774195 Orlando : 1313.7662337662337 Salt Lake City : 989.0 Newark : 1904.1311475409836 Hartford : 1471.0 Miami : 1404.1875808538164 San Antonio : 247.0 New York : 1639.402862985685 Omaha : 583.0 Portland : 1616.0 San Jose : 1643.7894736842106 Austin : 520.7835051546392 Charlotte : 936.0 Kansas City : 441.0 Chicago : 906.5361010830325 Fort Lauderdale : 1182.0 Dayton : 861.0 San Francisco : 2099.5552486187844 Tulsa : 448.61290322580646 Los Angeles : 2424.0010449320794 Atlanta : 731.0 Indianapolis : 761.0 Fayetteville : 280.0 Wichita : 328.0 Columbus : 926.0 Washington : 1322.2067039106146 St. Louis : 752.1764705882352 Kahului : 2881.043010752688 El Paso : 551.0 Oklahoma City : 175.0 Ontario : 1188.0 Phoenix : 1154.0 Santa Ana : 1315.5151515151515 Baltimore : 1217.0 Burbank : 1231.0 Kona : 2504.0 Las Vegas : 1605.6666666666667 Norfolk : 1212.0 Philadelphia : 1303.0 Minneapolis : 852.0 Houston : 619.5172413793103 Lihue : 2615.0 Palm Springs : 1126.0 Honolulu : 3112.8231707317073 San Juan : 1045.0 Louisville : 733.0 Tampa : 789.25 Fort Myers : 1120.0 Colorado Springs : 592.0 San Diego : 1558.4528301886792 Boston : 1871.1462264150944 Mission/McAllen/Edinburg : 469.0 West Palm Beach/Palm Beach : 1123.0 Dallas/Fort Worth : 1040.072087912088 Charlotte Amalie : 1623.0
Le code complet est accessible sur Github
Conclusion
Nous constatons ici le potentiel que peut offrir la combinaison Spark/Cassandra. Il est désormais possible d'effectuer des opérations complexes (map/reduce, jointures, etc.) avec une base de données NoSQL, atténuant le compromis que l'on ferait en utilisant ce type de bases par rapport aux bases SQL classiques.
Au sujet de l'Auteur
Julien est un développeur, bloggeur, passionné par les nouvelles technologies, et travaille à la mise en place de solutions basées sur Cassandra, Elasticsearch, Spark, etc. pour des acteurs bancaires ou des startups. Sur son temps libre, julien s'intéresse aussi tout particulièrement aux dernières avancées concernant le machine learning et les algorithmes génétiques. Suivez Julien sur Twitter à @nettradefr.