Esse post faz parte de uma série de artigos sobre o case de uso do Apache Spark para processamento de dados em "tempo real".
INTRODUÇÃO
Esse ano tive a oportunidade de palestrar sobre o tema desse artigo na edição comemorativa de 10 anos do QCon em São Paulo (veja meu review sobre o evento). Acredito que o tema é relevante o suficiente para gerar um post mais detalhado explicando em minúcias a solução.
Quando falamos sobre big data pensamos em grandes volumes de dados, variedade e a velocidade de processamento desses dados. O aumento do volume de dados vem crescendo exponencialmente há décadas devido à popularização da Internet e o uso cada vez mais intenso de dispositivos móveis.
No início, o desafio era simplesmente conseguir armazenar grandes volumes de dados. Em seguida, com grandes massas de dados nas mãos, o desafio passou a ser: como vou processar isso? Agora os desafios são mais, digamos, desafiadores: como extrair informação relevante dos dados e como conseguimos fazer isso em tempo real.
O foco dessa série de artigos está no processamento de dados à medida que eles chegam nos sistemas e plataformas da empresa, ou seja, o chamado tempo real. Mas o que é tempo real?
Note que coloquei o termo tempo real entre aspas no título desse artigo, isso porque a interpretação é relativa e conceitual. Vou exemplificar: para um sistema de IoT processar uma informação em algumas dezenas de segundos certamente será lento demais, provavelmente, tempo real em IoT está na ordem de alguns milissegundos. Entretanto, para aplicações Web e plataformas comerciais, alguns segundos ou até minutos seriam tempos aceitáveis como sendo tempo real, ou seja, o termo tempo real é muito relativo. Por isso vale a pena contextualizar ele aqui para o case que será exposto.
Uma das linhas de negócio da Wavy é a venda de conteúdo por meio de aplicativos e serviços para operadoras de telefonia, na qual o usuário passa a assinar esses serviços e são cobrados de forma recorrente.
Antes da solução, que vou abordar posteriormente nessa série, esses dados de transações com as operadoras levavam entre 30-90 mins para serem consolidados e exibidos em nossas plataformas de visualização de dados. Isso é muito tempo e estava impactando no tempo da tomada de ações no nosso negócio. O nosso volume diário era superior a 110 milhões de transações com as operadoras espalhadas ao longo do dia.
A necessidade: processar milhões de transações de cobrança em tempo real. Nesse ponto entra o Apache Spark para o resgate.
Apache Spark
O Apache Spark é um motor de processamento distribuído de alto desempenho. Foi desenvolvido em 2009 pelo AMPLab da Universidade de Califórnia em Berkeley e em 2010 seu código foi aberto como projeto da fundação Apache.
A ferramenta foi desenvolvida para ser altamente escalável e processar grandes volumes de dados. O Apache Spark foi desenvolvido em linguagem Scala, porém expõe sua API que pode ser acessada usando outras linguagens. Na prática podemos escrever programas em Scala, Java, Python e até mesmo R (verificar disponibilidade de operações), o código passa por um plano de execução que acessa as APIs comuns do framework, isso de forma transparente para o desenvolvedor.
A arquitetura do cluster do Spark pode ser vista na Figura 1. O programa escrito pelo usuário é conhecido como Driver Program, esse programa é submetido ao Cluster Manager que gera o plano de execução e delega o processamento aos demais nós de processamento conhecidos como Worker Nodes. Para executar programas com o Apache Spark é imperativo que todos os nós consigam trocar informações através da gravação de arquivos em sistemas de arquivos distribuídos (DFS), tais como HDFS ou S3, por exemplo.
Figura 1: Arquitetura de um programa em execução no cluster Apache Spark. (fonte: https://spark.apache.org/docs/latest/cluster-overview.html)
Para mais detalhes sobre o funcionamento do Spark em cluster, veja a documentação.
Caso se interesse pelo tema, baixe o framework, inicie o spark-shell e faça um tour na ferramenta localmente, sem se preocupar com a complexidade de montagem de ambiente.
O shell do Spark provê uma forma fácil de aprender a usar a API e as ferramentas de análise exploratória de dados. Para isso, basta executar o comando no diretório de instalação do Apache Spark e começar:
./bin/spark-shell
Spark context available as 'sc' (master = local[*], app id = 345).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12
scala> _
Nesse exemplo uso como base de dados um arquivo com informações fictícias geradas sobre pessoas, o arquivo pode ser encontrado nesse repositório: https://github.com/eitikimura/structured-streaming/blob/master/dataset/raw_data/user-record.1.csv
Vamos fazer a carga do arquivo no formato CSV, setando o separador do arquivo que no caso é um ponto-e-vírgula:
scala> val df = spark.read.option("delimiter", ";").option("header", true).csv("file:/YOUR_PATH/user-record.1.csv")
Nesse momento os dados lidos e que estão no Dataframe estão organizados da seguinte forma:
+------------+-----------+---------+--------------+---+-------+--------------+
| id| country| city| phone|age|carrier|marital_status|
+------------+-----------+---------+--------------+---+-------+--------------+
| Quinn| Syria| Holywell|55(89)76207443| 54| VIVO| Single|
| Emi E. Leon| Pitcairn| Clermont|55(72)84497623| 52| VIVO| Married|
| Adele I.| Greenland| Darwin|55(09)68020238| 21| NEXTEL| Divorced|
| Preston R.| Uruguay| Borås|55(01)60250601| 28| OI| Married|
|William Buck| Montenegro| Casalvi|55(95)36737883| 56| OI| Married|
+------------+-----------+---------+--------------+---+-------+--------------+
scala>
Nessa fase é possível verificar o schema que o Spark inferiu da leitura dos dados por meio da invocação do método printSchema().
df.printSchema()
root
|-- id: string (nullable = true)
|-- country: string (nullable = true)
|-- city: string (nullable = true)
|-- phone: string (nullable = true)
|-- age: string (nullable = true)
|-- carrier: string (nullable = true)
|-- marital_status: string (nullable = true)
scala>
Para corrigir um tipo de dado, por exemplo, o campo age está como tipo string quando na realidade deveria ser do tipo integer, podemos executar as seguintes instruções:
val dframe = df.withColumn("age", df("age").cast(org.apache.spark.sql.types.IntegerType))
Ao verificar o schema novamente, podemos ver que agora o campo passou para o tipo correto (veja destaque em negrito):
scala> dframe.printSchema
root
|-- id: string (nullable = true)
|-- country: string (nullable = true)
|-- city: string (nullable = true)
|-- phone: string (nullable = true)
|-- age: integer (nullable = true)
|-- carrier: string (nullable = true)
|-- marital_status: string (nullable = true)
Vamos fazer uma agregação simples usando o campo carrier, dentro da base de dados para ver como está a distribuição de usuários por operadora:
df.groupBy("carrier").count().show()
+-------+-----+
|carrier|count|
+-------+-----+
| VIVO| 23|
| OI| 17|
| NEXTEL| 15|
| TIM| 19|
| CLARO| 26|
+-------+-----+
scala>
Podemos verificar a média de idade das pessoas da base e agrupar por estado civil, dessa forma:
dframe.groupBy("marital_status").avg("age").show()
gerando esse resultado:
+--------------+------------------+
|marital_status| avg(age)|
+--------------+------------------+
| Married| 37.54545454545455|
| Divorced| 38.43478260869565|
| Single|41.666666666666664|
+--------------+------------------+
scala>
Ao invés de obter essa informações fazendo chamadas à API do Dataframe podemos utilizar linguagem SQL para executar as operações sobre os dados, para isso, precisamos registrar o Dataframe como uma view temporária:
dframe.createTempView("vw_people")
Com a view definida agora podemos simplesmente executar comandos SQL sobre ela, por exemplo:
spark.sql("SELECT COUNT(1) FROM vw_people").show()
O resultado de uma simples contagem de registros:
+--------+
|count(1)|
+--------+
| 100|
+--------+
scala>
Ou mesmo consultas mais complexas, como essa de agrupamento de número de usuários com mais de 30 anos por operadora:
spark.sql("SELECT carrier, COUNT(1) FROM vw_people WHERE age > 30 GROUP BY carrier").show()
Veja o resultado a seguir:
+-------+--------+
|carrier|count(1)|
+-------+--------+
| VIVO| 13|
| OI| 8|
| NEXTEL| 11|
| TIM| 13|
| CLARO| 20|
+-------+--------+
Com o Spark temos opção tanto de usar a API de SQL fazendo invocação dos métodos por meio dos Dataframes ou simplesmente escrever diretamente instruções em linguagem SQL. Na prática, independente da forma que as instruções forem escritas o plano de execução das tarefas pelos Worker Nodes será o mesmo.
Espero que esse pequeno tour na ferramenta tenha despertado ainda mais seu interesse. A ideia aqui não foi dar uma visão completa do que pode fazer a ferramenta, mas sim apenas "arranhar a superfície" mostrando algumas coisas que podem ser feitas.
Lembrando que o Apache Spark foi projetado para escalar e processar grandes volumes de informação.
Conclusão
Esse artigo deu uma introdução sobre a palestra ministrada no QCon São Paulo 2019, mostrando o problema que queremos resolver: o processamento de dados em tempo real. Mostramos também uma breve introdução ao Apache Spark, a ferramenta empregada nessa jornada e alguns exemplos e dicas para iniciar na ferramenta.
Siga nossa série de artigos! No próximo faremos um estudo aprofundado sobre o Structured Streaming, a ferramenta de processamento de streams do Apache Spark, esse será o próximo passo para o processamento em "tempo real".
Sobre o Autor
Eiti Kimura é Coordenador de TI e Arquiteto de Sistemas Distribuídos de alto desempenho na Wavy. Eiti tem 17 anos de experiência em desenvolvimento de software. É entusiasta de tecnologias open-source, MVP do Apache Cassandra desde 2014 e tem vasta experiência com sistemas de back-end, em especial plataformas de tarifação e mensageria para as principais operadoras de telefonia do Brasil.