Esse post faz parte de uma série de artigos sobre o case de uso do Apache Spark para processamento de dados em tempo real, confira a parte 1 e a parte 2.
Chegamos ao passo final dessa jornada, vimos anteriormente uma introdução e um detalhamento do funcionamento da API de processamento de streams, Structured Streaming, chegou a hora de um case em produção da Wavy: mais de 110 milhões de transações em "tempo real".
Caso de Uso: Um moedor de dados em "tempo real"
Na Wavy trabalhamos com grandes volumes de transações todos os dias, mensalmente a empresa envia mais de 1 bilhão de mensagens para nossos clientes, por mensagens, entenda como sendo comunicação que impacta os clientes como: SMS e Whatsapp, por exemplo.
Além de oferecer uma nova experiência em comunicação aos nossos clientes, temos uma linha de negócio com as operadoras de telefonia celular, que oferecemos os serviços de desenvolvimento de produtos e distribuição de mídia e conteúdo digital, levando novos assinantes para as plataformas das operadoras. Vejam um pouco dos nosso números:
Nesse contexto, a plataforma de gerenciamento de assinantes e tarifação processa a cobrança dos usuários em seus serviços nas operadoras e gerencia o ciclo de vida das assinaturas nos produtos, frequência de cobrança, novas assinaturas, renovação e assim por diante.
Essas mais de 110 milhões de transações que temos diariamente, refletem a saúde das plataformas e do negócio, mostram o fluxo de novos usuários e o cancelamento de produtos. Essas informações são a base para tomadas de ações muito importantes tanto de negócios como técnicas.
Em nossa solução inicial, anterior à que vou abordar aqui, todo o processo para consolidar essa massa de dados, processando para extrair a informação e disponibilizar em nossos dashboards variava de 30 a 60 minutos, no pior caso mais de 90 minutos. Nessa abordagem, os dados gerados em nossas aplicações, distribuídas em vários servidores, eram persistidas em um banco dados relacional, em seguida a informação era transferida para um banco de analytics na nuvem, com capacidade de processar consultas ad-hoc sobre o volume de dados mencionado.
Nós precisávamos processar esses dados mais rapidamente, à medida que os eventos fossem acontecendo, consolidar e mostrar essas informações mais rapidamente, dando mais visibilidade do negócio, as campanhas de mídia em televisão precisam ser acompanhadas em "tempo real", por exemplo.
A solução inicial passou a não nos atender com a velocidade que precisávamos, por isso começamos a trabalhar com outras estratégias e ferramental. No nosso caso, optamos por trabalhar com o processador de streams do Apache Spark, o Structured Streaming. Na parte 2 dessa série de artigos eu explico o funcionamento da tecnologia em detalhes, agora vou mostrar como aplicar na prática para esse nosso problema.
Que tal processar dados em "tempo real"?
Para atingir o objetivo de processar os dados em "tempo real", tivemos que implementar uma série de mudanças também nas plataformas. A primeira alteração foi parar de centralizar os dados no banco de dados relacional e assumir a estratégia de dividir para conquistar, ao invés de centralizar. Do lado dos sistemas, a persistência das transações não seriam mais feitas no banco de dados, mas sim gerando pequenos arquivos nas máquinas locais, com isso, ganhamos inclusive mais desempenho nas aplicações. A partir disso, os arquivos passaram a ser transferidos para nosso sistema de arquivos distribuídos HDFS. Esses arquivos são copiados das máquinas locais para o HDFS via scripts de transferência mantidos por nossos times internos. O diagrama da Figura 1 ilustra a geração dos arquivos:
Figura 1: Arquivos gerados pelas aplicações e transferidos ao HDFS.
O job de processamento contínuo executa no cluster Apache Spark conectado ao HDFS para acessar os arquivos para serem processados. Os diretórios do HDFS foram mapeados como sendo um stream de dados, novos arquivos chegam continuamente a cada minuto.
A consolidação da informação executada pelo Structured Streaming é persistida em um banco relacional para alimentar nossos dashboards. A principal diferença aqui é que os dados são persistidos pré-agregados no banco de dados e processados continuamente. O diagrama da Figura 2 mostra o esquema com o Apache Spark como parte da solução.
Figura 2: Arquitetura da solução de processamento de dados em "tempo real".
Após essa apresentação da solução do ponto de vista arquitetural, vou focar mais na codificação, mas antes de entrar em código, vou mostrar um pouco sobre a estrutura dos arquivos gerados pelas aplicações. Todos os arquivos são escritos no formato CSV e compactados com Gzip. Segue uma amostra do arquivo:
838,2,5500000000,100015,"{""authCode"":""3215"",""transactionIdAuth"":""1011706220428374938""}",SUBSCRIPTION_050,0.5,11,0,1,14,Subscription renew.,2017-07-18 13:22:59.518,,,19,PRE,false,Charge Fail. CTN[31984771092][PRE][GSM]: Without Credit.,,,,0,458,,engine2dc2,23,3,5,2017-07-18 13:22:59.544,,FE1952B0-571D-11E7-8A17-CA2EE9B22EAB,NT0359
...
Uma linha de dados é composta de todas informações referentes a transação que foi efetuada, preço de cobrança, identificador do produto, cliente, respostas das operadora e assim por diante.
Seguindo os passos exemplificados na parte 2 dessa série, precisamos definir uma entrada de leitura de informação do stream, para isso é necessário se conectar ao cluster Apache Spark e configurar os parâmetros do readStream.
Note que todos os fragmentos de código apresentados estão escritos em linguagem Scala.
val conf = new SparkConf().setAppName("Structured Streaming")
val spark = SparkSession.builder()
.config(conf).getOrCreate()
val streamReader = spark.readStream
.format("csv")
.option("header", false)
.option("mode", "DROPMALFORMED")
.schema(ReadSchemas.csvTransactionSchema)
.load("hdfs://YOUR_PATH/20*/*/*/*.gz")
Para efetuar a leitura do arquivo no formato CSV, no caso do readStream, faz-se necessário informar o schema, ou seja os tipos de dados de cada um dos campos no arquivo, assim como o trecho de código a seguir:
// the csv data schema
def csvTransactionLogSchema = StructType {
StructType(Array(
StructField("id", StringType, true),
StructField("application_id", IntegerType, true),
StructField("carrier_id", IntegerType, true),
StructField("phone", StringType, true),
StructField("price", DoubleType, true),
StructField("origin_id", IntegerType, true),
. . .
))
}
O próximo passo é definir o processamento da informação, as transformações e agregações que serão aplicados continuamente sobre os dados.
Nessa primeira versão de processamento, todas as transformações foram feitas utilizando os métodos da API do Dataframe:
val query = streamReader
.withColumn("date", $"creation_date".cast("date"))
.withColumn("successful_charges", when($"transaction_status_id" === 2, 1))
.withColumn("no_credit", when($"transaction_status_id" === 0, 1).otherwise(0))
.withColumn("error", when($"transaction_status_id" === 3).otherwise(0))
.filter("carrier_id IN (1,2,4,5)")
.filter("transaction_status_id NOT IN (5, 6)")
.filter("transaction_action_id IN (0, 1)")
.withWatermark("creation_date", "3 hour")
.groupBy($"carrier_id", window($"creation_date", "5 minutes").as("window"))
.agg($"carrier_id",
avg($"response_time").as("avg_response_time"),
sum($"successful_charges").as("successful_charges"),
sum($"no_credit").as("no_credit"),
sum($"error").as("error"),
count($"carrier_id").as("total_attempts"))
O conteúdo desse trecho de código é denso, vale explicar algumas partes com mais detalhes. Começo com algumas transformações usando o comando withColumn
criando novas colunas que serão agregadas mais abaixo, em seguida temos algumas filtragens, por meio da instrução filter
. Nessa parte são filtradas algumas operadoras e status de transações referentes a cobranças de usuários.
Vale destacar que foi configurado o watermark para o processamento de eventos desordenados no stream, essa feature permite tratar eventos que chegam atrasados e fora de ordem para processamento, nesse caso estou utilizando o campo de criação do registro creation_date
como referência e dizendo que aceito uma janela de processamento de até 3 horas, caso os registros cheguem atrasados. Mais detalhes sobre como funciona o mecanismo de watermark na documentação oficial.
Seguindo o fluxo do código anterior, chegamos na parte final, em que são feitas algumas agregações, consolidados os números e contagens dos registros.
Depois de um tempo trabalhando com a API funcional dos Dataframes fica um pouco mais intuitiva a leitura e entendimento do código. Entretanto, as operações descritas muito se assemelham a uma query em SQL. Então, porque não simplificar o entendimento da transformação de dados usando a própria linguagem SQL? O Apache Spark suporta o uso da API SQL juntamente com o processamento de streams.
A linguagem SQL costuma ser padrão para a manipulação de dados tanto para equipes de desenvolvimento como para equipes de administradores de bancos de dados. Tomamos a decisão de escrever toda a lógica de transformação de dados em SQL como forma de envolver mais pessoas no projeto, tanto outros analistas de desenvolvimento como de dados, utilizando uma linguagem de comum conhecimento, o nosso bom e velho SQL.
Toda a lógica de manipulação e transformação de dados foi reescrita usando SQL para processamento contínuo dos dados contidos no Dataframe. Para isso, primeiro temos que registrar o Dataframe como uma view temporária, nesse caso chamada de transaction_temp_table
:
streamReader
.withWatermark("creation_date", "3 hour")
.createOrReplaceTempView("transaction_temp_table")
Essa é versão 2 do programa mostrado anteriormente, note que as transformações sobre os dados são as mesmas, ou seja, o output final do processamento da informação é o mesmo para as duas versões dos programas:
val query : DataFrame = spark.sql(
"""
SELECT carrier_id, TO_DATE(creation_date) as record_date,
HOUR(creation_date) as hour_of_day,
WINDOW(creation_date, "5 minutes").start as start_date,
AVG(response_time) as avg_response_time ,
SUM(CASE
WHEN transaction_status_id = 2 THEN 1
ELSE 0
END) as successful_charges,
SUM(CASE
WHEN transaction_status_id = 0 THEN 1
ELSE 0
END) as no_credit,
count(carrier_id) as total_attempts
FROM transaction_temp_table
WHERE carrier_id IN (1,2,4,5)
AND transaction_action_id IN (0, 1)
AND transaction_status_id NOT IN (5, 6)
GROUP BY carrier_id, TO_DATE(creation_date),
HOUR(creation_date),
WINDOW(creation_date, "5 minutes").start
""")
Para fins de debug foi configurado um writeStream para imprimir somente o resultado do processamento da informação. Esse é o resultado da informação consolidada dentro de uma janela de 5 minutos, os números agregados das transações com as operadoras:
+--+-----------+-----+------+------+------------+--------+---------+-----+-------+
|id|record_date|hour |start |end |avg_resp_tm |success |no_credit|error|tot_att|
+--+-----------+-----+------+------+------------+-- -----+---------+-----+-------+
|1 |2017-07-18 |13 |13:20 |13:25 |618.8061297 | 4 |2607 |195 |2806 |
|2 |2017-07-18 |13 |13:20 |13:25 |1456.424283 | 13 |10912 |1503 |12428 |
|5 |2017-07-18 |13 |13:20 |13:25 |1161.730896 | 9 |2796 |532 |3337 |
|4 |2017-07-18 |13 |13:20 |13:25 |2950.642105 | 4 |1364 |54 |1425 |
+--+-----------+-----+------+------+------------+--------+---------+-----+-------
Agora que já estamos processando os dados, precisamos decidir o que fazer com esse output. O Structured Streaming disponibiliza algumas integrações para direcionar o resultado do processamento no pipeline, são elas: sistemas de arquivos distribuídos, streaming server, como o Kafka, console para debug e uma extensão chamada de foreach, na qual é possível implementar uma integração customizada.
No caso de uso apresentado o resultado da agregação é direcionado para um banco de dados relacional, que foi implementado usando o output do tipo foreach. Segue o código do output:
val jdbcWriter = new JDBCSink(resource, username, password)
val foreachStream = query
.select($"carrier_id", $"date", $"hour_of_day", $"start_date", $"end_date")
.writeStream
.foreach(jdbcWriter)
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("2 minute"))
.option("checkpointLocation", "hdfs://YOUR_PATH/checkpoint-complete/")
.start
foreachStream.awaitTermination()
Alguns pontos de destaque são as configurações: checkpoint em que o diretório informado precisa ser em um DFS para que todos os outros Workers saibam em que ponto está o processamento; a frequência que o processamento será executado, no caso definido pelo Trigger de 2 minutos; o modo de saída tipo Update, na qual só os registros que sofreram alterações são direcionados para o output; por último o tipo de saída foreach que no caso foi implementada uma extensão para gravar os dados em banco relacional jdbcWriter.
Chamar o método awaitTermination()
em um WriteStream vai fazer com que o processamento seja iniciado e que a aplicação passe a executar de forma contínua!
Chegamos aqui ao final da implementação do nosso caso de uso, implementando um processador de streams em "tempo real". Agora entra a fase de implantação que consiste em gerar um artefato e submeter o job ao cluster Apache Spark, que foge ao escopo desse artigo.
Resultados obtidos
Como resultados obtidos, temos os dados sendo pré-agregados, ou seja, há impacto também no volume de informação armazenada além da velocidade de processamento e execução das consultas:
Aproximadamente 30 vezes mais rápido é o tempo em que as transações ocorreram até aparecerem consolidadas em nossos dashboards para consulta. É um resultado bastante expressivo!
Considerações finais e lições aprendidas
Para fazer a evolução do schema de leitura seja do CSV ou do JSON, por exemplo, não é trivial, uma nova versão da aplicação deve ser gerada e reimplantada. A boa notícia é que adicionar novos campos nos schemas não vão gerar erros ou problemas nas aplicações. No entanto, remover campos que já existem do schema vão provocar erros na leitura de dados, no momento do mapeamento dos arquivos para a estrutura de Dataframes.
Para trabalhar melhor a resiliência das aplicações o Spark disponibiliza algumas configurações adicionais na leitura dos dados, no momento de definir o seu ReadStream.
val input = spark.readStream
.option("mode", "DROPMALFORMED")
Durante a leitura dos arquivos, caso um dos registros esteja mal formado, o Spark geralmente interrompe o processamento do stream com um erro. Para deixar a aplicação mais robusta, podemos informar no momento da leitura que em caso de registros com problemas, eles simplesmente serão ignorados e os próximos registros passam a ser processados corretamente.
Existem alguns prós e contras de trabalhar com o processamento de arquivos como stream na configuração padrão do Spark, caso um arquivo sendo processado esteja corrompido, o programa vai parar com um erro de I/O. Para evitar que isso ocorra, basta configurar para que os arquivos corrompidos sejam simplesmente ignorados da seguinte forma:
spark.sqlContext
.setConf("spark.sql.files.ignoreCorruptFiles","true")
Outro ponto importante é entender se o cluster está com capacidade o suficiente para processar a informação na frequência especificada (Trigger). A melhor forma de descobrir se o cluster precisa ser escalado é acompanhando os logs de processamento do Spark. Segue um exemplo:
WARN ProcessingTimeExecutor:66 - Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 19455 milliseconds
Esse é um log avisando que o processamento deveria ter ocorrido com um intervalo de 1 segundo, no entanto o Spark está levando 19 segundos para processar, ao observar os recursos dos Workers durante o processamento foi possível notar que estavam completamente com recursos saturados:
Esse é um sinal claro de que é necessário adicionar mais recursos ao cluster, para aumentar sua capacidade de processamento. Outra alternativa seria diminuir a frequência de processamento, como mostrado no log anteriormente, não há como ter processamento da informação em menos de 19 segundos, dessa forma esse tempo poderia ser definido como o Trigger, mas isso, vai depender dos requisitos da aplicação.
Uma pergunta que geralmente me fazem e vale discutir: Porque não usou o Apache Kafka como stream server? É realmente uma questão muito pertinente.
A resposta é bastante simples, na arquitetura de nossas plataformas não tínhamos o Kafka como parte da solução. Isso iria demandar alteração em todas as aplicações que ao invés de simplesmente registrar o dado em um arquivo, teriam que passar a produzir dados no Kafka, assim como seria necessário levantar toda uma nova infraestrutura para suportar esse cluster. Para o momento da solução a escolha que demandou menos alterações nas aplicações e também o menor custo foi utilizar a capacidade do Apache Spark de mapear diretórios de um DFS como um stream de dados, dessa forma continuamos usando os arquivos.
Chegamos ao final dessa série de artigos sobre processamento de dados em "tempo real" com Apache Spark, foi uma série bastante profunda desde os fundamentos de processamento de streams até uma aplicação resolvendo problemas do mundo real em ambientes de produção.
Sobre o Autor
Eiti Kimura é Coordenador de TI e Arquiteto de Sistemas Distribuídos de alto desempenho na Wavy. Eiti tem 17 anos de experiência em desenvolvimento de software. É entusiasta de tecnologias open-source, MVP do Apache Cassandra desde 2014 e tem vasta experiência com sistemas de back-end, em especial plataformas de tarifação e mensageria para as principais operadoras de telefonia do Brasil.