BT

Diffuser les Connaissances et l'Innovation dans le Développement Logiciel d'Entreprise

Contribuez

Sujets

Sélectionner votre région

Accueil InfoQ Articles Utilisation de Cassandra en tant que RDD Spark avec le connecteur Datastax

Utilisation de Cassandra en tant que RDD Spark avec le connecteur Datastax

Introduction

Dans cet article, nous allons voir comment il est possible d'utiliser Cassandra et Spark, pour effectuer des opérations sur une grande quantité de données, le tout de manière distribuée.

Cassandra

Cassandra est une base de données NoSQL orientée colonnes, à l'origine développée par Facebook pour ses besoins en interne. Je vous invite à consulter les excellents articles de Duy Hai Doan qui présente ses fonctionnalités et son modèle de stockage de manière très exhaustive.

Spark

Spark est un moteur d’analyse adapté au traitement rapide de gros volumes de données. Spark effectue ces traitements sur des abstractions, appelés RDD (Resilient Distributed Dataset) qui peuvent répresenter différentes sources de données (une variable classique, un fichier texte local, une fichier sur HDFS, etc.). Spark est devenu un "Apache Top-Level Project" en février 2014, après avoir été accepté dans le programme d'incubation d'Apache en juin 2013.

Datastax

Datastax est une société basée à Santa Clara en Californie fournissant des solutions basées sur Cassandra. Ils ont récemment dévéloppé un connecteur permettant d'exposer des tables Cassandra en tant que source de données résiliente pour Spark.

Cas pratique

A titre d'exemple, nous allons utiliser des données représentant un ensemble de trajets reliant des villes américaines et effectuer des opérations impliquant ces trajets. Chaque trajet est caractérisé par :

  • une ville d'origine
  • une ville de destination
  • la distance entre les 2 villes
  • le temps de parcours
  • etc.

Pour chaque ville d'origine, nous allons calculer la moyenne des distances des trajets partant de cette même ville.

Voici les différentes étapes qui nous permettront d'y parvenir :

  • Insertion des informations relatives aux différents trajets (fichier csv compressé accessible ici) dans une base de données Cassandra.
  • Calcul du nombre de trajets, groupés par ville d'origine.
  • Calcul de la moyenne des distances des trajets, toujours groupés par ville d'origine.

Tout le code présenté dans cet article sera en Java.

Prérequis

Pour exécuter le programme détaillé par la suite, vous aurez besoin de :

  • JDK 7 ou supérieur
  • Git
  • Maven

Stockage des trajets

Nous allons insérer les trajets dans une base Cassandra à partir d'un fichier CSV. Le code CQL permettant de créer la table est le suivant :

CREATE TABLE RoadTrip (
    id int PRIMARY KEY,
    origin_city_name varchar,
    origin_state_abr varchar,
    destination_city_name varchar,
    destination_state_abr varchar,
    elapsed_time int,
    distance int
);

La classe Java qui contiendra les informations de chaque trajet est la suivante :

public class RoadTrip {
    private Integer id = null;
    private String originCityName = null;
    private String originStateAbr = null;
    private String destinationCityName = null;
    private String destinationStateAbr = null;
    private Integer elapsedTime = null;
    private Integer distance = null;
 ...
}

Les objets RoadTrip seront stockés dans Cassandra à l'aide du driver Datastax pour Cassandra. Afin de réduire le nombre d'accès à la base, nous allons utiliser une requête de type BatchStatement:

Insert insertStatement = QueryBuilder.insertInto("RoadTrip");
insertStatement.value("id", QueryBuilder.bindMarker())
    .value("origin_city_name", QueryBuilder.bindMarker())
    .value("origin_state_abr", QueryBuilder.bindMarker())
    .value("destination_city_name", QueryBuilder.bindMarker())
    .value("destination_state_abr", QueryBuilder.bindMarker())
    .value("elapsed_time", QueryBuilder.bindMarker())
    .value("distance", QueryBuilder.bindMarker())
;
PreparedStatement ps = session.prepare(insertStatement.toString());
...
BatchStatement batch = new BatchStatement();
for (RoadTrip roadtrip : roadtrips) {
 batch.add(ps.bind(roadtrip.getId(),
    roadtrip.getOriginCityName(),
    roadtrip.getOriginStateAbr(),
    roadtrip.getDestinationCityName(),
    roadtrip.getDestinationStateAbr(),
    roadtrip.getElapsedTime(),
    roadtrip.getDistance()
 ));
}
session.execute(batch);
...

Initialisation du Contexte Spark

Maintenant que les trajets sont stockés dans une table Cassandra, nous allons instancier un contexte Spark et lui indiquer comment accéder à Cassandra :

SparkConf conf = new SparkConf(true)
    .setMaster("local")
    .setAppName("DatastaxTests")
    .set("spark.executor.memory", "1g")
    .set("spark.cassandra.connection.host", "localhost")
    .set("spark.cassandra.connection.native.port", "9142")
    .set("spark.cassandra.connection.rpc.port", "9171");
SparkContext ctx = new SparkContext(conf);

Quelques précisions concernant les paramètres spécifiés :

  • Le paramètre master permet de spécifier l'url du cluster Spark. La string 'local' indique que l'on souhaite utiliser Spark en mode local, c'est à dire embarqué dans la machine virtuelle.
  • Le paramètre appName est le nom logique de l'application, qui apparaîtra dans l'interface d'administration Spark (accessible par défaut à l'adresse http://localhost:4040).
  • Les autres paramètres sont des propriétés permettant d'indiquer à Spark la quantité de mémoire à allouer pour le processus d'exécution, ainsi que l'adresse et les ports d'écoute de Cassandra.

Nous allons ensuite directement créer notre RDD en spécifiant quels keyspace/table Cassandra contiendront les trajets routiers :

SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ctx);

JavaRDD<CassandraRow> rdd = functions.cassandraTable("roadtrips", "roadtrip").toJavaRDD();

Déclenchement des opérations

Spark permet d'effectuer des opérations qui peuvent être de deux types :

  • Transformations : map, reduce, join, etc.
  • Actions : collect, take, etc.

Vous pouvez consulter la liste des opérations implémentées ici. Des exemples pratiques (en Scala) sont aussi consultables ici.

Les opérations effectuées par Spark sont lazy, dans le sens où elles ne sont exécutées seulement au moment où l'on souhaite récupérer le resultat de ces opérations en exécutant une action (collect(), take()).

De plus, si le résultat d'une opération intermédiaire est utile pour plusieurs ensembles d'opérations, il est possible de mettre en cache (et donc de calculer, sans récupérer) ce résultat intermédiaire afin qu'il ne soit pas calculé plusieurs fois.

Cette mise en cache est effectuée à l'aide de la fonction cache() persistant en mémoire (par défaut) l'abstraction RDD intermédiaire correspondante.

Dans le cas présent, les informations concernant l'ensemble des trajets sera utilisée deux fois, nous allons donc mettre en cache le RDD précédent :

rdd.cache();

Dans un premier temps, nous allons calculer le nombre de trajets, pour chaque ville d'origine :

JavaPairRDD<String, Integer> sizes = rdd.groupBy( 
        new Function<CassandraRow, String>() {
            @Override
            public String call(CassandraRow row) throws Exception {
                return row.getString("origin_city_name");
        }
    }).mapToPair(
        new PairFunction<Tuple2<String,Iterable<CassandraRow>>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<String, Iterable<CassandraRow>> t) throws Exception {
                return new Tuple2<String,Integer>(t._1(), Lists.newArrayList(t._2        ()).size());
    }
});
sizes.cache();

Premièrement, à l'aide de la transformation groupBy, nous allons obtenir une liste de tuples de type Tuple2 <String, Iterable<CassandraRow>>. Le premier élément du tuple est une String représentant une ville d'origine, le second élément est un Iterable de CassandraRow représentant l'ensemble des trajets partageant cette même ville d'origine.

La seconde transformation mapToPair permet de transformer chaque tuple précédent (de type Tuple2 <String, Iterable<CassandraRow>>) en tuple de type Tuple2<String, Integer>. Le second élément du tuple est ici un entier, représentant le nombre de trajets pour chaque ville d'origine.

Vous constaterez ici que nous mettons une nouvelle fois en cache ce résultat intermédiaire. Nous aurons en effet besoin du nombre de trajets, groupés par ville d'origine, pour calculer la moyenne des distances des trajets pour chacune de ces villes.

A cette étape, voici la liste des associations (ville d'origine, nombre de trajets) que nous venons de calculer :

Nb RoadTrips by origin
Albuquerque : 61
Raleigh/Durham : 62
Memphis : 24
Seattle : 31
Orlando : 154
Salt Lake City : 31
Newark : 61
Hartford : 31
Miami : 773
San Antonio : 176
New York : 978
Omaha : 57
Portland : 9
San Jose : 57
Austin : 194
Charlotte : 31
Kansas City : 93
Chicago : 1108
Fort Lauderdale : 31
Dayton : 31
San Francisco : 362
Tulsa : 62
Los Angeles : 957
Atlanta : 31
Indianapolis : 1
Fayetteville : 31
Wichita : 62
Columbus : 31
Washington : 358
St. Louis : 204
Kahului : 93
El Paso : 31
Oklahoma City : 31
Ontario : 36
Phoenix : 124
Santa Ana : 33
Baltimore : 27
Burbank : 8
Kona : 31
Las Vegas : 93
Norfolk : 50
Philadelphia : 8
Minneapolis : 30
Houston : 58
Lihue : 42
Palm Springs : 31
Honolulu : 164
San Juan : 62
Louisville : 1
Tampa : 124
Fort Myers : 31
Colorado Springs : 31
San Diego : 159
Boston : 212
Mission/McAllen/Edinburg : 30
West Palm Beach/Palm Beach : 62
Dallas/Fort Worth : 2275
Charlotte Amalie : 31

Le calcul de la moyenne des distances sera effectué en deux étapes. La première étape effectue deux transformations :

JavaPairRDD<String, Integer> sums = rdd.mapToPair(
    new PairFunction<CassandraRow, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(CassandraRow row) throws Exception {
            return new Tuple2(row.getString("origin_city_name"), row.getInt("distance"));
        }
    }).reduceByKey(
    new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer d1, Integer d2) throws Exception {
            return Integer.valueOf(d1.intValue()+d2.intValue());
        }
    });

Ces transformations consistent à :

  • Ré-utiliser la première abstraction que nous avons mis en cache afin d'associer, pour chaque trajet, la ville d'origine avec la distance du trajet. Ceci est possible grâce à la transformation mapTopPair.
  • Calculer la somme des distances des trajets, le tout groupé par ville d'origine, grâce à la transformation reduceByKey. En plus d'associer la liste des distances à chaque ville d'origine, cette transformation va appliquer une fonction (ici une somme) entre tous les éléments de la liste.

L'abstraction sums contiendra ainsi un ensemble de tuples associant une ville d'origine à la somme des distances des trajets partant de cette ville.

Finalement, le calcul de la moyenne se fait en divisant, pour chaque ville d'origine, la somme des distances calculée précédemment aux nombres de trajets partant de cette même ville.

Il est ainsi nécessaire de faire correspondre, d'une façon ou d'une autre, les sommes en question groupées par ville d'origine, aux nombres de trajets correspondants, représentés par l'abstraction RDD sizes, qui a d'ailleurs été mise en cache.

L'opération permettant cette correspondance est la transformation join :

List<Tuple2<String,Double>> averageResults = sums.join(sizes)
       .mapValues(new Function<Tuple2<Integer,Integer>, Double>() {
        @Override
        public Double call(Tuple2<Integer, Integer> tuple) throws Exception {
            return Double.valueOf((double)tuple._1() / tuple._2());
        }
    }).collect();

Ici, les tuples (ville d'origine, somme des distances) de la première abstraction sums seront joints aux tuples (ville d'origine, nombre de trajets) de la seconde abstraction afin de former une abstraction RDD de type JavaPairRDD<String, Tuple2<Integer, Integer>>. Finalement, il suffit d'utiliser la transformation mapValues afin de calculer la moyenne à partir du tuple (Tuple2<Integer, Integer>> - (somme des distances, nombre de trajets)) constituant le deuxième élément de l'abstraction renvoyée par join.

L'action collect() permet de récupérer les résultats :

Average distance by origin
Albuquerque : 569.0
Raleigh/Durham : 880.5
Memphis : 432.0
Seattle : 2428.8387096774195
Orlando : 1313.7662337662337
Salt Lake City : 989.0
Newark : 1904.1311475409836
Hartford : 1471.0
Miami : 1404.1875808538164
San Antonio : 247.0
New York : 1639.402862985685
Omaha : 583.0
Portland : 1616.0
San Jose : 1643.7894736842106
Austin : 520.7835051546392
Charlotte : 936.0
Kansas City : 441.0
Chicago : 906.5361010830325
Fort Lauderdale : 1182.0
Dayton : 861.0
San Francisco : 2099.5552486187844
Tulsa : 448.61290322580646
Los Angeles : 2424.0010449320794
Atlanta : 731.0
Indianapolis : 761.0
Fayetteville : 280.0
Wichita : 328.0
Columbus : 926.0
Washington : 1322.2067039106146
St. Louis : 752.1764705882352
Kahului : 2881.043010752688
El Paso : 551.0
Oklahoma City : 175.0
Ontario : 1188.0
Phoenix : 1154.0
Santa Ana : 1315.5151515151515
Baltimore : 1217.0
Burbank : 1231.0
Kona : 2504.0
Las Vegas : 1605.6666666666667
Norfolk : 1212.0
Philadelphia : 1303.0
Minneapolis : 852.0
Houston : 619.5172413793103
Lihue : 2615.0
Palm Springs : 1126.0
Honolulu : 3112.8231707317073
San Juan : 1045.0
Louisville : 733.0
Tampa : 789.25
Fort Myers : 1120.0
Colorado Springs : 592.0
San Diego : 1558.4528301886792
Boston : 1871.1462264150944
Mission/McAllen/Edinburg : 469.0
West Palm Beach/Palm Beach : 1123.0
Dallas/Fort Worth : 1040.072087912088
Charlotte Amalie : 1623.0

Le code complet est accessible sur Github

Conclusion

Nous constatons ici le potentiel que peut offrir la combinaison Spark/Cassandra. Il est désormais possible d'effectuer des opérations complexes (map/reduce, jointures, etc.) avec une base de données NoSQL, atténuant le compromis que l'on ferait en utilisant ce type de bases par rapport aux bases SQL classiques.

Au sujet de l'Auteur

Julien est un développeur, bloggeur, passionné par les nouvelles technologies, et travaille à la mise en place de solutions basées sur Cassandra, Elasticsearch, Spark, etc. pour des acteurs bancaires ou des startups. Sur son temps libre, julien s'intéresse aussi tout particulièrement aux dernières avancées concernant le machine learning et les algorithmes génétiques. Suivez Julien sur Twitter à @nettradefr.

Evaluer cet article

Pertinence
Style

Contenu Éducatif

BT