Pontos Principais
- Uma alternativa para realizar carga de dados com o ksqlDB e o Schema Registry.
- Uma estratégia para stream de dados utilizando ksqlDB no lugar de Kafka Streams.
- Conceitos de Stream e Tables do ksqlDB.
Se a empresa faz uso do ecossistema do Apache Kafka, são diversas as formas para realizar a integração ou a carga de dados entre os sistemas. Este artigo apresenta o uso do ksqlDB com Schema Registry que, por meio do Kafka Connect, movimenta os dados de forma rápida e fácil, se aproveitando da escalabilidade do Apache Kafka.
Inserir dados no formato JSON a partir de um tópico no Apache Kafka em um banco de dados relacional é um cenário bem comum e que pode ser resolvido de diversas formas, desde desenvolvendo um serviço consumer para o Apache Kafka que grava os dados no banco de dados, até utilizando uma ferramenta de ETL (Extract, Transform, Load) como o Pentaho ou um dataflow, como o Apache NiFi.
Apesar deste cenário ser comum atualmente, apresenta diversos desafios, como por exemplo:
- Pouca experiência dos desenvolvedores na construção de consumers para o Apache Kafka;
- Construção de códigos técnicos que agregam pouco valor para o negócio;
- Falta de controles, como tratamento de erros, retentativas, atualizações por chave primária;
- Baixa integração entre as equipes de desenvolvimento e as equipes de analytics ou com aquelas que são responsáveis pelas ferramentas de ETL ou dataflow;
Neste contexto, se apresenta o ecossistema do Apache Kafka como uma alternativa viável com diversas ferramentas que facilitam e padronizam este processo. Entre as ferramentas podemos destacar o Kafka Connect, que permite o consumo das mensagens e a carga de dados em diversos gerenciadores de banco de dados, como por exemplo o PostgreSQL, de maneira simples e resiliente. O Kafka Connect possui o conceito de conectores de dados para entrada (Source) e saída (Sink), que são implementados por meio de plugins especializados. Neste artigo abordaremos os plugins JDBC Connector e Avro Converter.
O plugin JDBC Sink Connector requer que as mensagens do tópico tenham um formato com o schema declarativo (como Apache Avro, JSON Schema ou Protobuf), onde são indicados os tipos de dados dos atributos bem como a chave primária. Em função dessa característica do plugin, foi introduzida na arquitetura as ferramentas ksqlDB e Schema Registry desenvolvidas pela Confluent.
O ksqlDB
O ksqlDB (conhecido no passado apenas como KSQL) pode ser considerado um banco de dados de streaming de eventos, voltado à construção de aplicações para processamento de stream (por exemplo, para detecção de fraudes) ou aplicações baseadas no padrão Event Sourcing. Ele depende de um cluster Apache Kafka para operar, pois as definições, bem como os dados, ficam armazenados em tópicos. O ksqlDB foi construído sobre o framework Kafka Streams, e visa facilitar a sua utilização.
Assim como os bancos de dados relacionais, o ksqlDB fornece dois tipos de instruções:
- DDL (Linguagem de Definição de Dados);
- DML (Linguagem de Manipulação de Dados).
Ambos muito similares ao padrão SQL tradicional.
O ksqlDB trabalha com dois tipos de coleções, Stream e Tables:
- Uma stream é uma coleção imutável (append-only) de eventos. Ela é utilizada para representar uma série de fatos históricos;
- Já uma tabela (tables) é uma coleção mutável. Ou seja, quando são inseridos vários eventos com a mesma chave em uma tabela, a leitura retorna apenas os valores da última ocorrência.
Exemplo: O score de crédito de um cliente pode mudar ao longo do tempo. Para o ksqlDB, a série de scores de crédito do cliente é uma stream, já o score de crédito atual do cliente é considerada uma tabela.
A imagem a seguir, representa uma estrutura em alto nível do ksqlDB
Fonte: Confluent
O Schema Registry
O Schema Registry fornece uma camada de serviço para os metadados das mensagens. Ele provê uma interface RESTful para armazenar e recuperar schemas Avro, JSON Schema e Protobuf, e ele utiliza o próprio Apache Kafka como camada de armazenamento.
Fonte: Confluent
Caso de uso prático
O caso de uso deste artigo consiste em um sistema para despacho de entregas que precisa manter em sua base de dados PostgreSQL a posição atual dos entregadores a fim de otimizar o tempo das entregas. Para isso, o sistema recebe de um sistema de rastreamento, em um tópico do Apache Kafka, mensagens contendo a geolocalização (latitude e longitude) dos entregadores em formato JSON puro (sem schema).
Fonte: Elaborado pelos autores
A arquitetura da solução
Diagrama de Contexto no modelo C4 de Simon Brown elaborado pelos autores
O ksqlDB irá consumir as mensagens do tópico locations (localizações) e vai transformar o formato de JSON para AVRO, em seguida produzirá as mensagens no tópico locations_as_avro. O Kafka Connect consumirá as mensagens do tópico locations_as_avro e irá inserir ou atualizar os registros no banco de dados PostgreSQL.
Os componentes ksqlDB, Schema Registry e Kafka Connect serão executados em pods em um cluster de Kubernetes.
O Código fonte
Todo o código-fonte utilizado neste artigo pode ser encontrado no GitHub no seguinte endereço:
$ git clone https://github.com/marcioalvs/kafka-2-postgres.git
$ cd kafka-2-postgres/
Instalando o Schema Registry
#install schema registry (deployment + service)
$ kubectl apply -f schema-registry.yaml
$ kubectl apply -f schema-registry-service.yaml
Construindo a imagem do Kafka Connect com os plugins JDBC e Avro
Para consumir menos recursos do cluster Kubernetes, principalmente a memória, optamos por construir a imagem do Kafka Connect a partir da imagem do Apache Kafka gerada pelo projeto Strimzi, que pertence à Cloud Native Computing Foundation, e oferece um operator com diversos recursos que facilitam a implantação das ferramentas do ecossistema do Apache Kafka em um cluster de Kubernetes.
#build kafka-connect image
$ wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/5.5.1/confluentinc-kafka-connect-jdbc-5.5.1.zip -O temp.zip; unzip -d . temp.zip; rm temp.zip
$ wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/5.5.1/confluentinc-kafka-connect-avro-converter-5.5.1.zip -O temp.zip; unzip -d . temp.zip; rm temp.zip
$ export DOCKER_ORG=yourdockerhubuser
$ docker build . -t ${DOCKER_ORG}/kafka-connect-jdbc
$ docker push ${DOCKER_ORG}/kafka-connect-jdbc
Instalando o Operator da Strimzi no cluster de Kubernetes
#add helm chart repo for Strimzi
$ helm repo add strimzi https://strimzi.io/charts/
#install Strimzi Operator
$ helm install strimzi-kafka strimzi/strimzi-kafka-operator
Instalando o Kafka Connect
#install kafka-connect (via Strimzi Operator)
$ kubectl apply -f kafka-connect.yaml
Instalando o ksqlDB
#install ksqldb (deployment + service)
$ kubectl apply -f ksqldb.yaml
$ kubectl apply -f ksqldb-service.yaml
Criando um tópico e inserindo dados em JSON-puro
#open Kafka client
$ kubectl run kafka-client -ti --image=strimzi/kafka:latest-kafka-2.4.0 \
--rm=true --restart=Never -- bash
# create topic
$ /opt/kafka/bin/kafka-topics.sh \
--bootstrap-server broker1:9092,broker2:9092,broker3:9092 \
--create --replication-factor 3 --partitions 1 --topic locations
# put data in topic
$ /opt/kafka/bin/kafka-console-producer.sh \
--broker-list broker1:9092,broker2:9092,broker3:9092 \
--topic locations
# enter the data bellow
>{ "PROFILEID": "Entregador001", "LATITUDE": 40.7143528, "LONGITUDE": -74.0059731 }
$ exit
Acessando o CLI do ksqlDB
#open ksqldb-cli
$ kubectl run ksqldb-cli -ti \
--image=confluentinc/ksqldb-cli:0.11.0 \
--rm=true --restart=Never -- ksql http://ksqldb-server:8088
If you don't see a command prompt, try pressing enter.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v0.11.0, Server v5.5.1 located at http://ksqldb-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Criando uma stream para converter os dados no tópico para Avro
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> CREATE STREAM locations (PROFILEID VARCHAR, LATITUDE DOUBLE, LONGITUDE DOUBLE)
WITH (KAFKA_TOPIC='locations', VALUE_FORMAT='JSON');
ksql> CREATE STREAM locations_as_avro
WITH (KAFKA_TOPIC='locations_as_avro', VALUE_FORMAT='AVRO', PARTITIONS=1, REPLICAS=3) AS
SELECT * FROM locations;
ksql> exit
O primeiro comando muda o offset do consumer Kafka do ksqlDB para leitura à partir do início do tópico. O segundo comando cria uma Stream a partir de um tópico já existente. O terceiro comando cria uma Stream a partir da stream criada anteriormente, neste caso o ksqlDB cria um consumer/producer Kafka como um daemon, denominado Query, para obter os dados, converter o formato para Avro, e gravar as mensagens em um novo tópico.
Criando o conector JDBC Sink no Kafka Connect
#install postgres connector
$ kubectl apply -f kafka-connector-jdbc.yaml
No arquivo kafka-connector-jdbc.yaml a propriedade insert.mode=upsert instrui o conector JDBC Sink a executar um comando UPDATE, caso já exista um registro na tabela com a chave primária pk.fields="PROFILEID" correspondente ao mesmo atributo na mensagem (pk.mode=record_value). Caso não exista, irá executar um comando INSERT.
As propriedades auto.create
e auto.evolve
instruem o conector JDBC Sink a criar a tabela no banco caso ela ainda não exista, e a incluir as colunas que não existirem.
A propriedade value.converter=io.confluent.connect.avro.AvroConverter
instrui o conector JDBC Sink a fazer o parsing das mensagens usando o plugin Avro Converter, aplicando o schema obtido no Schema Registry indicado na propriedade value.converter.schema.registry.url
.
Verificando os dados que estão chegando no PostgreSQL
#acessando o postgres
$ psql "host=pghost=5432 dbname=yourdatabase user=youruser@pghost sslmode=require"
Password for user youruser@pghost:
psql (12.2 (Ubuntu 12.2-4), server 11.6)
SSL connection (protocol: TLSv1.2, cipher: ECDHE-RSA-AES256-GCM-SHA384, bits: 256, compression: off)
Type "help" for help.
yourdatabase=> select * from yourschema.locations_as_avro;
PROFILEID | LATITUDE | LONGITUDE
---------------+------------+-------------
Entregador001 | 40.7143528 | -74.0059731
(1 row)
Conclusão
O Kafka Connect, aliado ao ksqlDB, entrega uma solução simples e poderosa para transportar os dados provenientes do Kafka para uma base de dados relacional, como o PostgreSQL. Transformar as mensagens aplicando um schema Avro é apenas uma das diversas aplicações possíveis para o ksqlDB. Existem outros cenários de uso, como a junção/join de dados a partir de vários tópicos em um único tópico, a agregação de dados para geração de métricas em tempo-real, e o desenvolvimento seguindo o pattern Event Sourcing, que por si só dariam um ótimo material para outros artigos.
Pontos Importantes
O projeto Confluent Platform possui a licença open source com algumas restrições. Ela libera o uso dos softwares em projetos comerciais, desde que não concorram com os serviços oferecidos pela Confluent Cloud. Resumindo, permite usar o ksqlDB, o Schema Registry, e os plugins JDBC Connector e Avro Converter em arquiteturas de projetos corporativos, porém proíbe, que uma empresa ou indivíduo lance um serviço no modelo PaaS (Platform as a Service) e que concorram diretamente com os serviços oferecidos pela Confluent em Cloud e que façam uso do ksqlDB ou do Schema Registry ofertando como um serviço. Este movimento ocorreu em 2018 após a Confluent descobrir que alguns provedores cloud estavam oferecendo seus produtos para os clientes e cobrando por isso sem repassar créditos para a Confluent.
Referências
- https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/
- https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html
- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
- https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
- https://www.confluent.io/confluent-community-license-faq/
- https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas.enable-requires-schema-and-payload-fields/
- https://github.com/confluentinc/kafka-connect-jdbc/issues/824
- https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/
- https://ksqldb.io/quickstart.html#step-1
- https://docs.ksqldb.io/en/latest/developer-guide/syntax-reference/
Sobre os autores
|
Márcio Alves é pós-graduado em Engenharia de Software pela FASP e possui MBA em Governança de TI pelo IPT-USP. Atua na área de sistemas de informação a mais de 20 anos, liderando equipes segundo princípios ágeis na criação de soluções de arquitetura para cenários de missão-crítica, desenvolvimento de software, segurança da informação e mobilidade nas áreas de varejo, finanças e saneamento. Atualmente é IT Specialist e Solutions Architect na Via Varejo S.A. Podemos encontrá-lo em seu LinkedIn. |
|
Marcelo Costa é pós-graduado em Engenharia de Software pela UNICAMP. Atua em sistemas de alta complexidade desde 2002, liderando equipes multidisciplinares no desenvolvimento de soluções de software nas áreas de papel e celulose, varejo, aeroespacial, logística, educação, saúde e finanças. Especializa-se em liderança de equipes e em arquiteturas de soluções, em transformação organizacional, Arquitetura Ágil, Domain-Driven Design e heurísticas para construir melhor software. Atualmente é IT Specialist e Solutions Architect na Klabin S.A e Confluent Community Catalyst (2020/2021). Pode ser contatado pelo seu LinkedIn. |