Points Clés
- Apache Beam est un puissant projet open source de traitement par lots et en continu
- Sa portabilité permet d'exécuter des pipelines sur différents backends d'Apache Spark à Google Cloud Dataflow
- Beam est extensible, ce qui signifie que vous pouvez écrire et partager de nouveaux SDK, connecteurs IO et transformateurs
- Beam prend actuellement en charge Python, Java et Go
- En utilisant son SDK Java, vous pouvez profiter de tous les avantages de la JVM
Dans cet article, nous allons vous présenter Apache Beam, un puissant projet open source de traitement par lots et en streaming, utilisé par de grandes entreprises comme eBay pour intégrer ses pipelines de streaming et par Mozilla pour déplacer les données en toute sécurité entre ses systèmes.
Aperçu
Apache Beam est un modèle de programmation pour le traitement des données, prenant en charge le traitement par lots et le streaming.
À l'aide des SDK fournis pour Java, Python et Go, vous pouvez développer des pipelines, puis choisir un backend qui exécutera le pipeline.
Les avantages d'Apache Beam
Modèle Beam (Frances Perry et Tyler Akidau)
- Connecteurs d'E/S intégrés
- Les connecteurs Apache Beam permettent d'extraire et de charger facilement des données à partir de plusieurs types de stockage
- Les principaux types de connecteurs sont :
- Basé sur des fichiers (ex. : Apache Parquet, Apache Thrift)
- Système de fichiers (ex : Hadoop, Google Cloud Storage, Amazon S3)
- Messagerie (ex. : Apache Kafka, Google Pub/Sub, Amazon SQS)
- Base de données (ex. : Apache Cassandra, Elastic Search, MongoDb)
- En tant que projet OSS, la prise en charge de nouveaux connecteurs augmente (ex. : InfluxDB, Neo4J)
- Portabilité :
- Beam fournit plusieurs programmes d'exécution pour exécuter les pipelines, ce qui vous permet de choisir le meilleur pour chaque cas d'utilisation et d'éviter la dépendance vis-à-vis d'un fournisseur.
- Les backends de traitement distribué comme Apache Flink, Apache Spark ou Google Cloud Dataflow peuvent être utilisés comme exécuteurs.
- Traitement parallèle distribué :
- Chaque élément de l'ensemble de données est géré indépendamment par défaut afin que son traitement puisse être optimisé en s'exécutant en parallèle.
- Les développeurs n'ont pas besoin de répartir manuellement la charge entre les workers, car Beam fournit une abstraction pour cela.
Le modèle Beam
Les concepts clés du modèle de programmation Beam sont :
- PCollection : représente une collection de données, par exemple : un tableau de nombres ou de mots extraits d'un texte.
- PTransform : une fonction de transformation qui reçoit et renvoie une PCollection, exemple : sommer tous les nombres.
- Pipeline : gère les interactions entre PTransforms et PCollections.
- PipelineRunner : spécifie où et comment le pipeline doit s'exécuter.
Démarrage rapide
Une opération de pipeline de base consiste en 3 étapes : lire, traiter et écrire le résultat de la transformation. Chacune de ces étapes est définie par programmation à l'aide de l'un des SDK d'Apache Beam.
Dans cette section, nous allons créer des pipelines à l'aide du SDK Java. Vous pouvez choisir entre créer une application locale (à l'aide de Gradle ou Maven) ou utiliser le Online Playground. Les exemples utiliseront le runner local car il sera plus facile de vérifier le résultat à l'aide des assertions JUnit.
Dépendances locales Java
- beam-sdks-java-core : contient toutes les classes du modèle Beam.
- beam-runners-direct-java : par défaut, le SDK Apache Beam utilisera le runner direct, ce qui signifie que le pipeline s'exécutera sur votre machine locale.
Multiplier par 2
Dans ce premier exemple, le pipeline recevra un tableau de nombres et transformera chaque élément en le multipliant par 2.
La première étape consiste à créer l'instance de pipeline qui recevra le tableau d'entrée et exécutera la fonction de transformation. Comme nous utilisons JUnit pour exécuter Apache Beam, nous pouvons facilement créer un TestPipeline
comme attribut de classe de test. Si vous préférez exécuter votre application principale à la place, vous devrez définir les options de configuration du pipeline,
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
Nous pouvons maintenant créer la PCollection qui sera utilisée comme entrée du pipeline. Ce sera un tableau instancié directement à partir de la mémoire, mais il pourra être lu depuis n'importe quel endroit pris en charge par Apache Beam :
PCollection<Integer> numbers =
pipeline.apply(Create.of(1, 2, 3, 4, 5));
Ensuite, nous appliquons notre fonction de transformation qui multipliera chaque élément du jeu de données par deux :
PCollection<Integer> output = numbers.apply(
MapElements.into(TypeDescriptors.integers())
.via((Integer number) -> number * 2)
);
Pour vérifier les résultats, nous pouvons écrire une assertion :
PAssert.that(output)
.containsInAnyOrder(2, 4, 6, 8, 10);
Notez que les résultats ne sont pas censés être triés en tant qu'entrée, car Apache Beam traite chaque élément indépendamment et en parallèle.
Le test à ce stade est terminé et nous exécutons le pipeline en appelant :
pipeline.run();
Opération de réduction
Une opération de réduction est la combinaison de plusieurs éléments d'entrée qui se traduit par une collection plus petite, contenant généralement un seul élément.
MapReduce (Frances Perry et Tyler Akidau)
Étendons maintenant l'exemple ci-dessus pour sommer tous les éléments multipliés par deux, en utilisant une transformation MapReduce.
Chaque transformation PCollection génère une nouvelle instance PCollection, ce qui signifie que nous pouvons enchaîner les transformations à l'aide de la méthode apply
. Dans ce cas, l'opération Sum sera utilisée après avoir multiplié chaque entrée par 2 :
PCollection<Integer> numbers =
pipeline.apply(Create.of(1, 2, 3, 4, 5));
PCollection<Integer> output = numbers
.apply(
MapElements.into(TypeDescriptors.integers())
.via((Integer number) -> number * 2))
.apply(Sum.integersGlobally());
PAssert.that(output)
.containsInAnyOrder(30);
pipeline.run();
Opération FlatMap
FlatMap est une opération qui applique d'abord une transformation sur chaque élément d'entrée qui renvoie généralement une nouvelle collection, résultant en une collection de collections. Une opération de mise à plat est ensuite appliquée pour fusionner toutes les collections imbriquées, résultant en une seule.
Le prochain exemple transformera des tableaux de chaînes en un tableau unique contenant chaque mot.
Tout d'abord, nous déclarons notre liste de mots qui seront utilisés comme entrée du pipeline :
final String[] WORDS_ARRAY = new String[] {
"hi bob", "hello alice", "hi sue"};
final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
Ensuite, nous créons l'entrée PCollection en utilisant la liste ci-dessus :
PCollection<String> input = pipeline.apply(Create.of(WORDS));
Maintenant, nous appliquons la transformation flatmap, qui divisera les mots dans chaque tableau imbriqué et fusionnera les résultats dans une seule liste :
PCollection<String> output = input.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split(" ")))
);
PAssert.that(output)
.containsInAnyOrder("hi", "bob", "hello", "alice", "hi", "sue");
pipeline.run();
Opération de groupement
Un travail courant dans le traitement des données consiste à agréger ou à compter sur une clé spécifique. Nous allons le démontrer en comptant le nombre d'occurrences de chaque mot de l'exemple précédent.
Après avoir le tableau à plat de chaînes de caractères, nous pouvons enchaîner un autre PTransform :
PCollection<KV<String, Long<> output = input
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split(" ")))
)
.apply(Count.<String>perElement());
Résultant en :
PAssert.that(output)
.containsInAnyOrder(
KV.of("hi", 2L),
KV.of("hello", 1L),
KV.of("alice", 1L),
KV.of("sue", 1L),
KV.of("bob", 1L));
Lecture d'un fichier
L'un des principes d'Apache Beam est de lire des données de n'importe où, voyons donc en pratique comment utiliser un fichier texte comme source de données.
L'exemple suivant lit le contenu d'un "words.txt" contenant "An advanced unified programming model". Ensuite, la fonction de transformation renverra une PCollection contenant chaque mot du texte.
PCollection<String> input =
pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split(" ")))
);
PAssert.that(output)
.containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run();
Écrire le résultat dans un fichier
Comme vu dans l'exemple précédent pour l'entrée, Apache Beam dispose de plusieurs connecteurs de sortie intégrés. Dans l'exemple suivant, nous compterons le nombre de chaque mot présent dans le fichier texte "words.txt" qui ne contient qu'une seule phrase ("An advanced unified programming model") et la sortie sera conservée dans un fichier texte.
PCollection<String> input =
pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split(" ")))
)
.apply(Count.<String>perElement());;
PAssert.that(output)
.containsInAnyOrder(
KV.of("An", 1L),
KV.of("advanced", 1L),
KV.of("unified", 1L),
KV.of("programming", 1L),
KV.of("model", 1L)
);
output
.apply(
MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue()))
.apply(TextIO.write().to("./src/main/resources/wordscount"));
pipeline.run();
Même l'écriture de fichier est optimisée pour le parallélisme par défaut, ce qui signifie que Beam déterminera le meilleur nombre de shards (fichiers) pour conserver le résultat. Les fichiers seront situés dans le dossier src/main/resources et auront le préfixe "wordcount", le numéro de partition et le nombre total de partitions comme défini dans la dernière transformation de sortie.
Lors de son exécution sur mon ordinateur portable, quatre fragments ont été générés :
Première partition (nom de fichier : wordscount-00001-of-00003) :
An 1
advanced 1
Deuxième partition (nom de fichier : wordscount-00002-of-00003) :
unified 1
model 1
Troisième partition (nom de fichier : wordscount-00003-of-00003) :
programming 1
La dernière parition a été créé mais à la fin était vide, car tous les mots avaient déjà été traités.
Extension d'Apache Beam
Nous pouvons tirer parti de l'extensibilité de Beam en écrivant une fonction de transformation personnalisée. Un transformateur personnalisé améliorera la maintenabilité du code tout en supprimant la duplication.
Fondamentalement, nous aurions besoin de créer une sous-classe de PTransform, indiquant le type d'entrée et de sortie avec un générique Java. Ensuite, nous redéfinissons la méthode d'expansion et à l'intérieur de son contenu, nous plaçons la logique dupliquée, qui reçoit une seule chaîne et renvoie une PCollection contenant chaque mot.
public class WordsFileParser extends PTransform<PCollection<String>, PCollection<String>> {
@Override
public PCollection<String> expand(PCollection<String> input) {
return input
.apply(FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split(" ")))
);
}
}
Le scénario de test refactorisé pour utiliser WordsFileParser devient désormais :
public class FileIOTest {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void testReadInputFromFile() {
PCollection<String> input =
pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<String> output = input.apply(
new WordsFileParser()
);
PAssert.that(output)
.containsInAnyOrder("An", "advanced", "unified", "programming", "model");
pipeline.run();
}
@Test
public void testWriteOutputToFile() {
PCollection<String> input =
pipeline.apply(TextIO.read().from("./src/main/resources/words.txt"));
PCollection<KV<String, Long>> output = input
.apply(new WordsFileParser())
.apply(Count.<String>perElement());
PAssert.that(output)
.containsInAnyOrder(
KV.of("An", 1L),
KV.of("advanced", 1L),
KV.of("unified", 1L),
KV.of("programming", 1L),
KV.of("model", 1L)
);
output
.apply(
MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> kv) -> kv.getKey() + " " + kv.getValue()))
.apply(TextIO.write().to ("./src/main/resources/wordscount"));
pipeline.run();
}
}
Le résultat est un pipeline plus clair et plus modulaire.
Fenêtrage (Windowing)
Fenêtrage dans Apache Beam (Frances Perry et Tyler Akidau)
Un problème courant dans le traitement en streaming consiste à regrouper les données entrantes selon un certain intervalle de temps, en particulier lors du traitement de grandes quantités de données. Dans ce cas, l'analyse des données agrégées par heure ou par jour est plus pertinente que l'analyse de chaque élément du jeu de données.
Dans l'exemple suivant, supposons que nous travaillions dans une fintech et que nous recevons des événements de transactions contenant le montant et l'instant où la transaction s'est produite et que nous voulons récupérer le montant total traité par jour.
Beam fournit un moyen de décorer chaque élément PCollection avec un horodatage. Nous pouvons l'utiliser pour créer une PCollection représentant 5 transactions monétaires :
- Les montants 10 et 20 ont été transférés le 2022-02-01
- Les montants 30, 40 et 50 ont été transférés le 2022-02-05
PCollection<Integer> transactions =
pipeline.apply(
Create.timestamped(
TimestampedValue.of(10, Instant.parse("2022-02-01T00:00:00+00:00")),
TimestampedValue.of(20, Instant.parse("2022-02-01T00:00:00+00:00")),
TimestampedValue.of(30, Instant.parse("2022-02-05T00:00:00+00:00")),
TimestampedValue.of(40, Instant.parse("2022-02-05T00:00:00+00:00")),
TimestampedValue.of(50, Instant.parse("2022-02-05T00:00:00+00:00"))
)
);
Ensuite, nous appliquerons deux fonctions de transformation :
- Regrouper les transactions en utilisant une fenêtre d'un jour
- Additionner les montants de chaque groupe
PCollection<Integer> output =
Transactions
.apply(Window.into(FixedWindows.of(Duration.standardDays(1))))
.apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());
Dans la première fenêtre (2022-02-01), on s'attend à un montant total de 30 (10+20), tandis que dans la deuxième fenêtre (2022-02-05), on devrait voir 120 (30+40+50) dans le total montant.
PAssert.that(output)
.inWindow(new IntervalWindow(
Instant.parse("2022-02-01T00:00:00+00:00"),
Instant.parse("2022-02-02T00:00:00+00:00")))
.containsInAnyOrder(30);
PAssert.that(output)
.inWindow(new IntervalWindow(
Instant.parse("2022-02-05T00:00:00+00:00"),
Instant.parse("2022-02-06T00:00:00+00:00")))
.containsInAnyOrder(120);
Chaque instance IntervalWindow doit correspondre aux horodatages exacts de début et de fin de la durée choisie, de sorte que l'heure choisie doit être "00:00:00".
Résumé
Apache Beam est un puissant framework de données testé au combat, permettant à la fois le traitement par lots et par flux. Nous avons utilisé le SDK Java pour réaliser des opérations de transformation, de réduction, de groupement, de fenêtrage et d'autres opérations.
Apache Beam peut être bien adapté aux développeurs qui travaillent avec des tâches en parallèle embarrassantes pour simplifier les mécanismes du traitement de données à grande échelle.
Ses connecteurs, ses SDK et la prise en charge de divers exécuteurs apportent de la flexibilité et en choisissant un exécuteur cloud natif comme Google Cloud Dataflow, vous bénéficiez d'une gestion automatisée des ressources de calcul.