Esse post faz parte de uma série de artigos sobre o case de uso do Apache Spark para processamento de dados em tempo real, confira a parte 1.
Apache Spark Structured Streaming
Dentre as funcionalidades disponibilizadas pelo Apache Spark, uma das que mais chama a atenção para o problema de processamento de dados em tempo real é a disponibilização da API de processamento de streams de dados, conhecida como Structured Streaming.
Usando a API de processamento de streams, conseguimos filtrar, contar, agregar dados, fazer projeções e agrupar os dados que estão passando em tempo real em seu stream, no caso poderia ser um stream server como o Kafka, por exemplo, ou mesmo um sistema de arquivos distribuídos (DFS).
Para ilustrar melhor o uso da API do Structured Streaming, vou replicar um exemplo bastante simples para comparar uma implementação de processamento em lote com o processamento de um stream. Esse exemplo é o bê-á-bá quando se trata dessa tecnologia.
Nesse exemplo, temos a leitura de um diretório, representado por "source-path" em formato CSV, os dados são selecionados e depois um filtro simples é aplicado; no resultado os dados são transformados no formato parquet.
O primeiro exemplo é utilizando a API de processamento em lote, como usualmente se escreve um programa para o Apache Spark:
input = spark.read
.format("csv")
.load("source-path")
result = input
.select("device", "signal")
.where("signal > 15")
result.write
.format("parquet")
.save("dest-path")
Fragmento 1. Trecho de código em Scala para processamento de dados usando a API de Batch.
Já no segundo exemplo, vou mostrar como transcrever o fragmento de código anterior para passar a processar um stream de dados. O mesmo programa pode ser reescrito sem muito esforço ficando da seguinte forma:
input = spark.readStream .format("csv") .load("source-path") result = input .select("device", "signal") .where("signal > 15") result.writeStream .format("parquet") .start("dest-path")
Fragmento 2. Trecho de código em Scala para processamento de dados usando a API Structured Streaming.
Note que comparado com o programa anterior as modificações foram mínimas, preservando completamente a regra de negócio aplicada aos dados, que nesse caso é a filtragem da informação. Ao invés de usar o comando read, vamos usar o readStream. Igualmente ao invés do comando write, vamos usar o writeStream e ao invés de chamar o comando save, vamos chamar o start, para iniciar o processamento.
Lembrando que um job usando a API de stream gera um processo que executa continuamente, uma vez que conceitualmente um stream não tem começo nem fim, mas um fluxo contínuo de dados variando ao longo do tempo. Conceitualmente podemos começar a consumir a informação de um stream de qualquer ponto no tempo, inclusive simultaneamente em pontos diferentes (offsets). A Figura 1 ilustra a produção e consumo de informação em um stream de dados.
Figura 1: Fluxo de dados de um stream.(fonte: https://kafka.apache.org/intro)
Vale ressaltar aqui que o Apache Spark Structured Streaming não é um stream server, ou seja, não é equivalente ao Apache Kafka. O Structured Streaming foi projetado com o objetivo de processar informações, de e para streams de dados.
A Figura 2 ilustra o processamento de streams pelo Structured Streaming, no qual existe um fluxo de dados para ser processado e uma representação de linha temporal (da esquerda para direita). Conforme o tempo passa, chegam novos registros que são continuamente processados.
O Spark faz a leitura dos dados em uma estrutura de dados chamada Input Table, responsável por ler as informações de um stream e implementar as operações da API de Dataframe da plataforma. Em seguida toda regra de negócio, processamento, transformações e filtragens são aplicadas aos dados que são consolidados em uma estrutura chamada Result Table, responsável por armazenar os dados antes de serem despejados como resultado da computação no Output configurado, que por sua vez pode ser um sistema de arquivos distribuído, banco de dados ou até mesmo outro stream.
As operações sobre os streams de dados são executadas continuamente à medida que os dados vão chegando para leitura e processamento. A frequência na qual esses pequenos lotes de processamento são executados pode ser definida por meio da configuração de um Trigger, um gatilho que dispara o processamento do stream. Essa configuração vai de acordo com a sua capacidade de processamento e também das necessidades de sua aplicação.
Figura 2: Ilustração do processamento de dados contínuo pelo Apache Spark. (fonte: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
As características que mais chamam a atenção no Structured Streaming são:
• Nova API de alto nível: a antiga API de processamento de streams, DStreams foi descontinuada, a nova API Structured Streaming foi pensada para gerar o mínimo impacto possível no código já existente, além de corrigir os problemas da versão anterior;
• Junção de dados contínua: é possível fazer continuamente operações de junção (join) de uma entrada de dados do tipo stream com conteúdo estático, um outro DataFrame carregado a partir de arquivos, por exemplo;
• Integrações com diversas fontes de dados: diversos tipos de arquivo são suportados quando se tratam de sistemas de arquivos distribuídos que podem ser mapeados como streams tanto de entrada como de saída, são eles: CSV, Parquet e JSON. Podemos conectar o Spark diretamente a um stream server como o Apache Kafka;
• Tolerância a falhas (checkpoints): esse é um ponto que trabalha a resiliência da aplicação. Após a realização de um conjunto de operações sobre os dados, o Spark sempre guarda o estado atual das operações em forma de checkpoints, marcadores registrando o progresso do processamento. Esses marcadores são gravados no próprio DFS, dessa forma compartilhados com os demais nós do cluster. Em caso de falha de um dos workers de processamento, os arquivos de checkpoint são utilizados para saber exatamente em que ponto o processamento foi executado e de que ponto deve continuar. Os checkpoints aumentam a disponibilidade geral da aplicação. Observe na Figura 2 que antes do disparo de um uma nova tarefa de processamento, o estado anterior é lido para saber o ponto que parou o processo anterior;
• Tratamento de eventos desordenados (watermark): esse é um dos grandes diferenciais dessa ferramenta, o tratamento de dados desordenados. Imagine o caso de uma aplicação baseada em eventos no tempo, um agrupamento de quantos eventos passaram dada uma janela temporal. Exemplo: contagem de eventos recebidos a cada 5 minutos. O que aconteceria se um evento chegasse atrasado para processamento? Não poderíamos contabilizar como se o evento tivesse sido gerado no momento do processamento, caso contrário teríamos uma inconsistência nos dados. A implementação dos watermarks trata exatamente esse tipo de caso. Uma janela temporal é mantida e se um evento chegar atrasado, porém, ainda dentro dessa janela, o dado será re-consolidado de forma retroativa. Exemplo: janela de 10 min, se o dado chegar em até 10 min de atraso, ele será consolidado na faixa de tempo correta em que o evento ocorreu no passado, mantendo dessa forma a consistência nas operações.
A Figura 3 ilustra o caso de um evento que chegou atrasado (veja as anotações em vermelho), ao invés de ser descartado ou consolidado na janela de tempo errada, foi identificado que o dado chegou atrasado e ele foi consolidado corretamente em sua janela de tempo.
Figura 3: Tratamento de dados que chegaram atrasados usando Watermarking. (fonte: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking)
Um exemplo prático com o Structured Stream
Todo código mostrado aqui pode ser baixado do seguinte repositório:
http://github.com/eitikimura/structured-streaming
Agora que já foi explicada a parte teórica junto com as principais características da ferramenta, chegou a hora de desenvolver um exemplo prático.
Nesse exemplo, vamos ler os dados simulando que estão chegando em um sistema de arquivos, no caso, o sistema será o disco local mesmo, para fins didáticos. Os dados lidos de um CSV tem os campos descritos na tabela a seguir:
Tabela 1: Amostra de dados contida nos arquivos CSV.
Os arquivos estão organizados da seguinte forma:
O primeiro passo é se conectar ao cluster do Spark e configurar o Input de dados. Esse exemplo foi preparado para executar localmente, por isso não há necessidade de criar um cluster de Apache Spark para executar o código, tudo processa de forma auto-contida. Todos os fragmentos de código mostrados a seguir estão em linguagem Scala.
val DIR = new java.io.File(".").getCanonicalPath + "/dataset/stream_in"
//setting cluster definition
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Spark Structured Streaming Job")
// initiate spark session
val spark = SparkSession
.builder
.config(conf)
.getOrCreate()
//1. == DATA INPUT ==
// read data from datasource, in this particular case it is a directory
val reader = spark.readStream
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("latestFirst", "true")
.schema(SchemaDefinition.csvSchema)
.load(DIR + "/*")
Fragmento 1: Estabelecendo a session e definindo o input de dados.
Nesse exemplo, os arquivos estão armazenados no diretório: dataset/stream_in.
É importante notar que na definição do readStream o schema do CSV foi informado para o Spark, nesse ponto é necessário mostrar como devem ser lidos os dados, segue o schema:
object SchemaDefinition {
// the csv data schema
def csvSchema = StructType {
StructType(Array(
StructField("name", StringType, true),
StructField("country", StringType, true),
StructField("city", StringType, true),
StructField("phone", StringType, true),
StructField("age", IntegerType, true),
StructField("carrier", StringType, true),
StructField("marital_status", StringType, true)
))
}
}
Fragmento 2: Definição do Schema de dados.
Com a entrada de dados lendo as informações com readStream informamos ao Spark que o diretório deve ser tratado como um stream de dados, dessa forma o Spark sabe que haverá um influxo de novos arquivos nesse diretório e que eles devem ser processados continuamente.
O próximo passo será o processamento com os dados que estão chegando, nesse exemplo, vou fazer um agrupamento dos registros pelos campos: carrier e marital_status, simplesmente fazendo a contagem do número de ocorrências.
//2. == DATA PROCESSING ==
reader.createOrReplaceTempView("user_records")
val transformation = spark.sql(
"""
SELECT carrier, marital_status, COUNT(1) as num_users
FROM user_records
GROUP BY carrier, marital_status
""")
Fragmento 3: Processamento do stream de entrada com a API de Dataframe.
Como o Structured Stream trabalha com a mesma API de processamento dos Dataframes podemos simplificar a regra de negócio escrevendo o código em SQL, ao invés de uma das outras linguagens suportadas.
Para fazer isso, o Dataframe deve ser registrado como uma view temporária e a partir de então já podemos executar comandos SQL sobre essa view. Isso foi feito usando o comando: createOrReplaceTempView("user_records")
.
Temos dois passos já bem definidos, a entrada e o processamento, agora falta definirmos a saída. Para fins didáticos, a saída será simplesmente impressa no console:
//3. == DATA OUTPUT ==
val consoleStream = transformation.
writeStream.
option("truncate", false).
outputMode(OutputMode.Complete).
trigger(Trigger.ProcessingTime("2 seconds")).
format("console").
start()
consoleStream.awaitTermination()
Fragmento 4: Definição da saída de informação, resultado do processamento.
Algumas observações quanto a definição da saída writeStream, o OutputMode.Complete
indica que toda a saída deve ser despejada na saída configurada, que no caso do exemplo apenas imprimir o resultado por meio da instrução: format("console")
. A frequência de execução foi definida pelo gatilho em: Trigger.ProcessingTime("2 seconds")
.
O programa só começa efetivamente o processamento quando a instrução start()
é invocada. A saída a seguir mostra o resultado do processamento com a execução de 3 batches:
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------------+---------+
|carrier|marital_status|num_users|
+-------+--------------+---------+
|VIVO |Single |21 |
|TIM |Single |22 |
|OI |Single |31 |
|NEXTEL |Single |23 |
|NEXTEL |Married |30 |
|CLARO |Single |19 |
|CLARO |Divorced |22 |
|TIM |Divorced |30 |
|VIVO |Divorced |34 |
|OI |Married |24 |
|VIVO |Married |27 |
|NEXTEL |Divorced |22 |
|CLARO |Married |33 |
|TIM |Married |29 |
|OI |Divorced |33 |
+-------+--------------+---------+
-------------------------------------------
Batch: 2
-------------------------------------------
+-------+--------------+---------+
|carrier|marital_status|num_users|
+-------+--------------+---------+
|VIVO |Single |28 |
|TIM |Single |29 |
|OI |Single |43 |
|NEXTEL |Single |28 |
|NEXTEL |Married |41 |
|CLARO |Single |26 |
|CLARO |Divorced |25 |
|TIM |Divorced |33 |
|VIVO |Divorced |44 |
|OI |Married |29 |
|VIVO |Married |33 |
|NEXTEL |Divorced |25 |
|CLARO |Married |41 |
|TIM |Married |33 |
|OI |Divorced |42 |
+-------+--------------+---------+
-------------------------------------------
Batch: 3
-------------------------------------------
+-------+--------------+---------+
|carrier|marital_status|num_users|
+-------+--------------+---------+
|VIVO |Single |36 |
|TIM |Single |35 |
|OI |Single |50 |
|NEXTEL |Single |38 |
|NEXTEL |Married |50 |
|CLARO |Single |33 |
|CLARO |Divorced |32 |
|TIM |Divorced |38 |
|VIVO |Divorced |50 |
|OI |Married |34 |
|VIVO |Married |38 |
|NEXTEL |Divorced |32 |
|CLARO |Married |46 |
|TIM |Married |39 |
|OI |Divorced |49 |
+-------+--------------+---------+
A cada batch executado, os dados são consolidados e agregados, os dados da coluna num_users é atualizado a cada iteração.
Para simular a saída anterior, execute o programa e em tempo de execução mova gradativamente arquivos para dentro do diretório mapeado pelo stream.
Mova gradualmente os arquivos do diretório raw_data para o stream_in para simular a chegada de novos arquivos no stream de dados e veja o processamento acontecer:
Toda estrutura de arquivos mostrada anteriormente e o projeto de exemplo pode ser encontrado nesse repositório. É possível usar esse projeto para executar tudo localmente em seu computador sem necessidade de grandes ambientes de infraestrutura configurados.
Conclusão
Esse artigo mostrou a fundo o funcionamento da API de processamento de streams do Apache Spark, suas principais características, uma breve comparação de um código de processamento em lote e processamento contínuo.
Como destaques ressalto a capacidade de tratar registros que chegam atrasados para processamento (watermarking) e a tolerância a falhas do mecanismo de stream (checkpoint).
Mostrei também um programa prático de processamento de streams, comentando passo a passo e como executá-lo em seu próprio ambiente de desenvolvimento.
No próximo artigo dessa série mostro como usamos o processador de Stream do Spark para processar dados em tempo real resolvendo um problema em produção na Wavy!
Sobre o Autor
Eiti Kimura é Coordenador de TI e Arquiteto de Sistemas Distribuídos de alto desempenho na Wavy. Eiti tem 17 anos de experiência em desenvolvimento de software. É entusiasta de tecnologias open-source, MVP do Apache Cassandra desde 2014 e tem vasta experiência com sistemas de back-end, em especial plataformas de tarifação e mensageria para as principais operadoras de telefonia do Brasil.