Pontos Principais
- Os processadores de stream devem ser escalonáveis para atender às crescentes necessidades de negócios do processamento de stream de dados;
- Escalar um processador de stream em um ambiente de container requer equilibrar o trade-off entre os aspectos de qualidade de serviço e o custo associado;
- O processador de stream deve ser capaz de escalar horizontalmente para explorar esse compromisso em um ambiente de container como o Kubernetes em execução em um provedor de serviços em nuvem;
- A execução bem-sucedida de um aplicativo de processamento de stream em um ambiente de container depende da quantidade de recursos de hardware provisionados para cada worker de processador de stream;
- Adicionar mais recursos de hardware do que o necessário para o ambiente de container não aumenta o desempenho.
O processamento de stream de dados tornou-se um dos principais paradigmas para análise de dados nos últimos tempos, devido à crescente exigência de aplicações para processamento de stream de eventos. Diferentes aplicações notáveis de processamento de stream podem ser encontradas em áreas como: telecomunicações, gerenciamento de tráfego de veículos, gerenciamento de multidões, informática da saúde, segurança cibernética, finanças, etc.
Processadores de stream são plataformas de software que permitem aos usuários processar e responder a stream de dados de entrada mais rapidamente (consulte o artigo O que é o processamento de stream?). Há vários processadores de stream disponíveis no mercado para escolher (consulte o artigo Como escolher um processador de stream?). Flink, Heron, Kafka, Samza, Spark Streaming, Storm e WSO2 Stream Processor são alguns exemplos de processadores de stream em código aberto.
A operação em tempo real dos processadores de stream é fundamental para fornecer um serviço de alta qualidade em termos de desempenho do sistema. A maioria dos processadores de stream modernos podem lidar com 90% dos casos de uso de streaming com poucos nós de computador. No entanto, com o tempo, devido às expansões de negócios, as empresas mais lucrativas precisam lidar com quantidades crescentes de cargas de trabalho. Portanto, o processador de stream escolhido precisa ser capaz de escalonar e manipular cargas de trabalho maiores facilmente.
Cada vez mais, os processadores de stream foram implantados como Software como Serviço (SaaS) em sistemas de computação em nuvem. Alguns exemplos notáveis incluem o Amazon Kinesis Data Analytics, o Microsoft Azure Stream Analytics, o Google Cloud Dataflow, etc.
Os processadores de stream baseados em nuvem fornecem a capacidade de escalar de forma elástica os aplicativos de processamento de stream executados neles. Os ambientes de gerenciamento centralizados em contêiner, como Kubernetes, usados pelos provedores de serviços de nuvem permitem a execução escalável de aplicativos de processamento de stream.
No entanto, não é trivial quando e como fazer o dimensionamento de um processador de stream distribuído em um ambiente de contêiner principalmente devido à heterogeneidade do ambiente de hardware e software devido às características dos streams de dados processados.
Este artigo apresenta um cenário de exemplo prático do mundo real de um aplicativo de processamento de stream com uso intensivo de dados, explica como ele pode ser escalado sistematicamente no Kubernetes e explica os trade-offs associados. Foi utilizado o WSO2 Stream Processor (WSO2 SP) como processador de stream de exemplo neste artigo, pois ele é um processador de stream nativo e de código-fonte aberto adequado para implementar esses aplicativos. No entanto, acreditamos que os mesmos conceitos podem ser igualmente aplicados a qualquer outro processador de stream nativo em nuvem disponível no mercado.
Foi selecionado um exemplo real de processamento de stream relacionado à detecção de ataques mal-intencionados que tentam fazer tentativas de login não autorizadas em um servidor web, criando uma situação de negação de serviço (ataque DoS). Uma vez que tal ataque DoS tenha sido detectado, o sistema envia alertas aos administradores do sistema para que medidas de segurança necessárias possam ser tomadas.
Detectando ataques mal-intencionados usando logs do servidor web
O status operacional de um servidor web pode ser monitorado usando os eventos de log HTTP capturados no log do servidor web. A aparição de códigos de status HTTP, como 401 (não autorizado) ou 403 (proibido) de maneira contínua, mostra que há tentativas de login mal-intencionado no servidor web. A resposta 401 pode mostrar que a autorização foi recusada para as credenciais fornecidas pela parte externa que tenta acessar o servidor web. O código de status 403 indica que o servidor está se recusando a atender a solicitação, embora o servidor tenha entendido a solicitação.
Figura 1: Detectando ataques mal-intencionados em um servidor web.
Existem diferentes abordagens para lidar com esse cenário. No entanto, neste caso de uso, o especialista em segurança da informação prefere receber as informações em tempo real como alertas. Um alerta precisa ser acionado se a contagem de solicitações mal-intencionadas exceder 30 vezes em três segundos, com uma taxa de acesso (definida como (contagem não autorizada + contagem proibida) / contagem total de acesso) sendo 1.0. Foi desenvolvido o aplicativo de geração de alerta, conforme apresentado na Figura 1, usando um processador de stream que recebe e processa eventos de log do servidor web. Com a escalabilidade do sistema em mente, o sistema foi implantado em um cluster de Kubernetes em execução no Google Compute Engine (GCE). A Listagem 1 mostra o stream do código SQL escrito na linguagem de consulta Siddhi. Chamamos tal aplicativo como o aplicativo Siddhi.
Listagem 1: Aplicativo de detecção de ataques maliciosos desenvolvido com o Siddhi Streaming SQL
@App:name("MaliciousAttacksDetection")
@App:description("HTTP Log Processor for detecting malicious DoS attacks")
@source(type = 'kafka', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='kafka-service:9092', topic.list = 'attackDetectionTopic',
@map(type = 'json'))
define stream inputStream ( iij_timestamp long, ip string, timestamp long, zone float, cik double, accession string, doc string, code float, size double, idx float, norefer float, noagent float, find float, crawler float, groupID int, browser string);
--Saída da query 1: Quero saber o IP dos hosts maliciosos que tentaram fazer alguma tentativa de login não autorizado em um curto espaço de tempo.
@sink(type='log')
define stream outputStreamDoSAlert(ip string ,groupID int);
-- A latência real de aplicativos paralelos de siddhi está começando a medir neste momento
@info(name = "Query1")
@dist(execGroup='group11' ,parallel ='1')
from inputStream
select iij_timestamp, ip, timestamp, zone, cik, accession, doc, code, size, idx, norefer, noagent, find, crawler, groupID, browser, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestamp
insert into interimInputStream;
--Query 2: Aqui todos os acessos são 401 ou 403 e foram feitos pelo menos 30 vezes no período de 3 segundos.
@info(name = "Query2")
@dist(execGroup='group3', parallel ='12')
partition with (groupID of interimInputStream)
begin
from interimInputStream#window.timeBatch(timestamp, 3 sec)
select ip, count() as totalAccessCount, sum(ifThenElse(code == 401F, 1, 0)) as unauthorizedCount, sum(ifThenElse(code == 403F, 1, 0)) as forbiddenCount,injected_iijtimestamp as iijtimestamp,groupID,timestamp
insert into #interimStream3;
from #interimStream3#throughput:throughput(iijtimestamp,"throughput",3,6,"outputStreamDoSAlert",30)
select ip, totalAccessCount, (unauthorizedCount + forbiddenCount)/totalAccessCount as accessPercentage ,groupID
insert into #interimStream5;
from #interimStream5 [totalAccessCount > 30L and accessPercentage == 1.0]
select ip ,groupID
insert into outputStreamDoSAlert;
end;
Esta aplicação foi implementada em um processador de stream distribuído conforme apresentado no diagrama de arquitetura de implementação na Figura 2. Aqui, todos os componentes, como Worker-1, Worker-2, …, etc., são implementados como um Container único e como um único Kubernetes pod. Cada uma das categorias de contêineres e as tarefas executadas por eles são listadas na Tabela I. A Figura 2 apresenta um cenário em que todo o sistema é implantado em seis nós de Kubernetes.
Tabela 1: A lista de tarefas realizadas por diferentes tipos de contêineres no ambiente do Kubernetes
Categoria do Contêiner |
Tarefa Executada pela Categoria do Contêiner |
Gerenciador |
Os dois gerenciadores no gerenciador cluster são duas instâncias de SP configuradas para execução no modo de alta disponibilidade. |
Worker |
Workers são os contêineres que fazem o processamento real dos streams de dados. Cada Worker é uma instância do SP configurada para ser executada usando o Worker profile. |
Zookeeper |
Foi utilizado três nós do Zookeeper para implementar a funcionalidade de alta disponibilidade. |
Producer |
Nesses experimentos, o container Producer é o que gera cargas de trabalho variadas. Na arquitetura atual do sistema, o contêiner Producer lê os arquivos de log e gera eventos que são enviados para o Kafka Broker que coleta os eventos de entrada no sistema. |
SP-RDBMS |
Foi usado um servidor de banco de dados relacional separado para coletar as informações de desempenho dos aplicativos do processador de stream que são executados no sistema. Os números de desempenho são coletados em três níveis: nível de nó, nível de contêiner e nível de aplicativo Siddhi. |
Usamos um broker Kafka separado que coordena a comunicação entre os componentes no cluster. |
|
NFS |
Foi utilizado o NFS (Network File System) para armazenar e acessar os detalhes de configuração do processador de stream distribuído. |
Foi implementado esta aplicação para funcionar no ambiente de Kubernetes do Google Compute Engine. Para uma carga de trabalho específica P, o sistema deve operar e fornecer um valor Q específico (QoS - valor de qualidade de serviço). Mediu-se o valor de Q dos números de latência observados dos workers (latência é a diferença de tempo entre o momento em que o evento entra no worker e a hora em que o evento sai do worker). A lista de pontos a seguir descreve como cada componente é implementado no cluster.
- O nó 1 hospeda o servidor web;
- O nó 2 hospeda o componente producer que é o produtor de carga de trabalho para este cenário. Ele lê os logs do servidor web e os publica na instância do Kafka em execução no Nó 4;
- Os nós 2 e 4 executam os dois gerenciadores do processador de stream;
- O nó 3 hospeda o NFS gerado automaticamente pelo gcloud;
Os nós 5 e 6 executam os workers e lidam com a carga real de trabalho. Eles leem dados da instância do Kafka, os processam aplicando as operações de processamento de stream e gravam os resultados de volta à instância do Kafka.
Figura 2: Arquitetura de implantação do cluster do Kubernetes.
No entanto, à medida que o tempo passa, a carga de trabalho no servidor web também é aumentada. Essa é uma característica típica encontrada nos servidores web da maioria das organizações. Pode aumentar a carga de trabalho, por exemplo, de P para 2P, 4P, …, 16P, etc. Nesse cenário, a organização que opera o sistema de monitoramento do servidor web enfrenta problemas de manutenção do atributo de qualidade de serviço em Q' de uma forma quase equivalente a Q. Aqui, Q' é o valor observado do atributo QoS. Em tais situações, o processador de stream deve ser capaz de escalar o suficiente para manter o nível esperado de QoS. Observe que a escalabilidade é a capacidade de um sistema lidar com quantidades crescentes de carga de trabalho. A escalabilidade também é conhecida como a capacidade do sistema de expandir para suportar o aumento da carga de trabalho. Neste artigo, nosso foco específico está na Escalabilidade de Carga, que é a capacidade de um sistema funcionar normalmente conforme o tráfego oferecido aumenta.
Existem duas abordagens para executar a escalabilidade de carga: dimensionamento forte e dimensionamento fraco. O escalonamento forte mantém o mesmo tamanho de problema para resolver, mas aumenta o número de processadores. O escalonamento fraco aumenta o número de processadores mantendo um tamanho de problema fixo por processador. Neste artigo, conduzimos um escalonamento fraco porque enfrentamos o fenômeno do aumento da carga de trabalho.
Experimentos
Utilizou-se um cluster de Kubernetes provisionado no Google Compute Engine (GCE). Além disso, usou-se nós experimentais com 2 CPUs e 7.5 GB de memória (tipo de máquina n1-padrão-2) para hospedar os pods. Cada pod está tendo um contêiner e os componentes do processador de stream do contêiner (SP) foram implementados. Usou-se o JDK 1.8, o Kafka 2.0.1, o WSO2-SP 4.3.0 e o MYSQL 5.7.4 para construir as imagens do docker. Todos eles foram implantados como aplicativos em contêiner no cluster. Um cluster é mantido já que cada pod possui apenas um componente SP. Conduzimos cada experimento por 40 minutos, incluindo um período de aquecimento de 10 minutos. Note que usamos a notação x-y-z para denotar (número de nós) - (número de workers) - (número de instâncias).
Usamos o EDGAR Log File Data Set como o conjunto representativo de dados reais para os experimentos, uma vez que ele já inclui um conjunto de dados de log do servidor web na forma de CSV (Comma-Separated Values). O conjunto de dados utilizado foi o arquivo de registro loglogv170325.csv do conjunto de dados EDGAR Log File. O arquivo CSV teve 22.146.382 eventos com um tamanho total de arquivo de aprox. 2.4GB que tinha um tamanho médio de mensagem de 144 bytes. Cada registro teve 15 campos. A Listagem 2 mostra os dois primeiros registros do conjunto de dados EDGAR.
Listagem 2: Primeiros dois registros do conjunto de dados EDGAR Log File
ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser
100.14.44.eca,2017-03-25,00:00:00,0.0,1031093.0,0001137171-10-000013,-index.htm,200.0,7926.0,1.0,0.0,0.0,10.0,0.0,
Mediu-se o desempenho em três níveis, como nível de nó, nível de contêiner e nível de processador de stream. No entanto, as conclusões feitas neste artigo baseiam-se em números de latência de nível e throughput do processador de stream. Medimos esses valores no grupo de execução do Siddhi grupo 3, que corresponde à aplicação parcial de Siddhi que foi implantada em cada um dos trabalhadores do SP. Os seis experimentos diferentes que realizamos usando o cluster Kubernetes nos forneceram a lista de resultados mostrada na Tabela 2.
Na Tabela 2, o ID corresponde ao identificador exclusivo da experiência. Nenhum dos nós corresponde ao número total de Kubernetes Workers. A contagem de instâncias é o número de instâncias do grupo de execução Siddhi usadas para o experimento. A taxa de dados do produtor (número de segmentos) corresponde ao número de produtores que geram simultaneamente eventos de streaming lendo os eventos de log HTTP dos arquivos de dados EDGAR.
Resultados
Os dois nós, 6 workers, 6 casos de instância (2-6-6) resultaram em 390ms de latência média para um único produtor de carga de trabalho P. Esse é o caso básico que se assemelha à implementação real desse caso de uso. Com o aumento da carga de trabalho, a latência aumenta significativamente enquanto a taxa de transferência diminui. Considerou-se o caso especial de carga de trabalho aumentado para um valor significativo, aumentando os encadeamentos do gerador de carga de trabalho para 16. A Cenário 2 mostra essa situação. Quando aumenta-se os encadeamentos do gerador em 16 vezes, a latência aumenta em 28,2% enquanto a taxa de transferência é reduzida em 29%. Conforme apresentado no Cenário 3, reduzir o número de IDs de grupo únicos nos itens de dados de entrada torna as coisas piores (especialmente em termos de taxa de transferência). Isso ocorre porque a redução do número de IDs de grupos exclusivos resulta em menor quantidade de paralelismo a ser explorado pelo aplicativo.
Se dobrarmos o número de workers por nó, conforme mostrado no Cenário 4, será reduzida a quantidade de memória por worker pela metade. Isso reduz o rendimento em dois terços do caso 1. A latência aumenta em 9 vezes. Portanto, observamos que a quantidade de memória de cada worker desempenha um papel importante na determinação da latência do aplicativo. Às vezes, se a quantidade suficiente de memória não foi fornecida, os funcionários podem nem mesmo ser implantados.
No quinto cenário, aumentamos o número de nós de trabalho de Kubernetes para três com 12 workers, o que elimina o gargalo de desempenho. Isso adiciona outro nó (nó 7) à Figura 2, com cada nó tendo quatro workers de processador de stream. Cada worker solicita 1 GB de memória e cada nó pode suportar 4-5 workers com essa quantidade de memória. Portanto, mesmo que tenhamos uma configuração do sistema 3-6-6 (ou seja, três nós, seis workers e seis instâncias), ela funcionará com o desempenho de 2-6-6. No entanto, as características de desempenho das configurações 2-12-12 e 3-12-12 não serão as mesmas.
Adicionar mais recursos de hardware do que o necessário não ajuda. Pode-se observar isso a partir do cenário experimental 6. Embora tenhamos dobrado o número de nós em comparação com o cenário 5, obteve-se uma latência média semelhante ao cenário 5. A abordagem seguida pelo cenário 6 pode ser útil se os funcionários obtiverem aplicações Siddhi mais parciais adicionadas. Isso requer memória adicional, onde três nós não podem fornecer.
Tabela 2: Números de desempenho para diferentes configurações de cluster do Kubernetes
ID |
N° de Nós |
N° de Workers |
N° de Instâncias |
Taxa de dados do produtor (No of Threads) |
Memória Alocada do Worker (GB) |
Taxa de transferência (por instância) |
Latência Média Geral (ms) |
IDs de grupo exclusivos |
1 |
2 |
6 |
6 |
P |
1 |
900 |
390 |
12 |
2 |
2 |
6 |
6 |
16P |
1 |
640 |
500 |
12 |
3 |
2 |
6 |
6 |
16P |
1 |
310 |
420 |
6 |
4 |
2 |
12 |
12 |
16P |
0.5 |
300 |
3500 |
12 |
5 |
3 |
12 |
12 |
16P |
1 |
660 |
300 |
12 |
6 |
6 |
12 |
12 |
16P |
1 |
640 |
310 |
12 |
Como foi utilizado partições no aplicativo Siddhi, o aplicativo Siddhi é dividido em aplicativos Siddhi parciais, que receberão a carga de trabalho de acordo com a distribuição de valores exclusivos do atributo que estamos usando para a partição. A coluna mais à direita da Tabela 2 mostra o número de valores de atributos exclusivos usados para o campo groupID. Em nosso aplicativo Siddhi, estamos usando o groupID como o atributo de partição. Portanto, usar 6 valores exclusivos significa que apenas 6 aplicativos Siddhi parciais podem obter a carga de trabalho de acordo com o mapeamento baseado em hash de Siddhi. Portanto, estamos aumentando o paralelismo aumentando o número de IDs de grupos exclusivos para 12. Isso significa que a distribuição de stream será direcionada para 12 aplicativos parciais de Siddhi. Por esses motivos, o paralelismo está aumentando e estamos obtendo latências melhores no cenário 1, em relação ao cenário 3.
Resumo
A escalabilidade é um desafio significativo enfrentado pelos processadores de stream em ambientes de contêiner, porque a carga de trabalho do aplicativo é aumentada com o tempo. Este artigo apresentou a experiência adquirida de escalonar tal processador de stream distribuído em um ambiente Kubernetes. Para fazer isso, o processador de stream deve fornecer uma linguagem de programação / construções de consulta para manter o nível ideal de paralelismo, independentemente da escala inicial do aplicativo. A execução de trabalhadores do processador de stream com várias partições permite o escalonamento do processador de stream. Com as crescentes cargas de trabalho, uma quantidade suficiente de recursos de hardware precisa ser provisionada para que o sistema possa manter o nível de QoS acordado pelos operadores. Adicionando mais recursos incorre em custos adicionais. No entanto, adicionar mais recursos não garante melhorias de desempenho. Em vez disso, o processador de stream deve ser capaz de identificar o nível de requisitos de recursos e escalonar até o nível em que a relação entre desempenho e custo ideal pode ser mantida.
Sobre os autores
Sarangan Janakan é estagiário de engenharia de software na WSO2. Ele está atualmente no terceiro ano de graduação no Departamento de Ciência da Computação e Engenharia da Universidade de Moratuwa, Sri Lanka. Seus interesses de pesquisa incluem processamento de stream de dados e computação em nuvem.
Miyuru Dayarathna é um líder técnico sênior no WSO2. Ele é um cientista da computação com múltiplos interesses em pesquisa e contribuições sobre computação de stream, gerenciamento de bancos de dados em grafos e mineração de dados, computação em nuvem, engenharia de desempenho, segurança de informações, etc. Ele também é consultor do Departamento de Ciência da Computação e Engenharia da Universidade de Moratuwa. Sri Lanka. Seu recente foco de pesquisa na WSO2 foi o Stream Processor e o Identity Server. Ele publicou artigos técnicos em renomadas revistas internacionais e conferências.