BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Tales of Kafka at Cloudflare: Lessons Learnt on the Way to 1 Trillion Messages

Tales of Kafka at Cloudflare: Lessons Learnt on the Way to 1 Trillion Messages

This item in japanese

Key Takeaways

  • Kafka clusters are used at Cloudflare to process large amounts of data, with a general-purpose message bus cluster developed to decouple teams, scale effectively, and process trillions of messages.
  • To address the issue of unstructured communication for event-driven systems, a strong contract should be in place: cross-platform data format Protobuf helped Cloudflare achieve that.
  • Investing in metrics on development tooling is critical to allow problems to be easily surfaced: Cloudflare enriched the SDK with OpenTracing and Prometheus metrics to understand how the system behaves and make better decisions, especially during incidents.
  • To enable consistency in the adoption and use of SDKs and promote best practices, it is important to prioritize clear documentation on patterns.
  • Cloudflare aims to achieve a balance between flexibility and simplicity: while a configurable setup may offer more flexibility, a simpler one allows standardization across different pipelines.

 

Cloudflare has generated over 1 trillion messages to Kafka in less than six years just for inter-service communication. As the company and application services team grew, they had to adapt their tooling to continue delivering fast.

We will discuss the early days of working in distributed domain-based teams and how abstractions were built on top of Kafka to reach the 1 trillion message mark.

We will also cover real incidents faced in recent years due to scalability limitations and the steps and patterns applied to deal with increasing demand.

What Is Cloudflare?

Cloudflare provides a global network to its customers and allows them to secure their websites, APIs, and internet traffic.

This network also protects corporate networks and enables customers to run and deploy entire applications on the edge.

Cloudflare offers a range of products, including CDN, Zero Trust, and Cloudflare Workers, to achieve these goals, identifying and blocking malicious activity and allowing customers to focus on their work.

[Click on the image to view full-size]

Figure 1: Cloudflare’s Global Network

Looking at the Cloudflare network from an engineering perspective, there are two primary components: the Global Edge network and the Cloudflare control plane.

A significant portion of the network is built using Cloudflare's products, with Workers deployed and used on the edge network. The control plane, on the other hand, is a collection of data centers where the company runs Kubernetes, Kafka, and databases on bare metal. All Kafka producers and consumers are usually deployed into Kubernetes, but the specific deployment location depends on the workload and desired outcomes.

In this article, we will focus on the Cloudflare control plane and explore how inter-service communication and enablement tools are scaled to support operations.

Kafka

Apache Kafka is built around the concept of clusters, which consist of multiple brokers, with each cluster having a designated leader broker responsible for coordination. In the diagram below, broker 2 serves as the leader.

[Click on the image to view full-size]


 
Figure 2: Kafka Cluster

Messages are categorized into topics, such as user events, for example, user creation, or user information updates. Topics are then divided into partitions, an approach that allows Kafka to scale horizontally. In the diagram, there are partitions for topic A on both brokers, with each partition having a designated leader to determine its "source of truth".  To ensure resilience, partitions are replicated according to a predetermined replication factor, with three being the usual minimum. The services that send messages to Kafka are called producers, while those that read messages are called consumers.

Cloudflare Engineering Culture

In the past, Cloudflare operated as a monolithic PHP application, but as the company grew and diversified, this approach proved to be limiting and risky.
Rather than mandating specific tools or programming languages, teams are now empowered to build and maintain their services, with the company encouraging experimentation and advocating for effective tools and practices. The Application Services team is a relatively new addition to the engineering organization, to make it easier for other teams to succeed by providing pre-packaged tooling that incorporates best practices. This allows development teams to focus on delivering value.

Tight Coupling

With the product offerings growing, there was a need to find better ways of enabling teams to work at their own pace and decouple from their peers and the engineering team also needed to have more control over backoff requests and work completion guarantees.

As we were already running Kafka clusters to process large amounts of data, we decided to invest time in creating a general-purpose message bus cluster: onboarding is straightforward, requiring a pull request into a repository, which sets up everything needed for a new topic, including the replication strategy, retention period, and ACLs. The diagram illustrates how the Messagebus cluster can help decouple different teams.

[Click on the image to view full-size]


 
Figure 3: The general-purpose message bus cluster

For example, three teams can emit messages that the audit log system is interested in, without the need for any awareness of the specific services. With less coupling, the engineering team can work more efficiently and scale effectively.

Unstructured Communication

With an event-driven system, to avoid coupling, systems shouldn't be aware of each other. Initially, we had no enforced message format and producer teams were left to decide how to structure their messages. This can lead to unstructured communication and pose a challenge if the teams don't have a strong contract in place, with an increased number of unprocessable messages.

To avoid unstructured communication, the team searched for solutions within the Kafka ecosystem and found two viable options, Apache Avro and protobuf, with the latter being the final choice. We had previously been using JSON, but found it difficult to enforce compatibility and the JSON messages were larger compared to protobuf.

[Click on the image to view full-size]

Figure 4: A protobuf message

Protobuf provides strict message types and inherent forward and backward compatibility, with the ability to generate code in multiple languages also a major advantage. The team encourages detailed comments on their protobuf messages and uses Prototool, an open-source tool by Uber, for breaking change detection and enforcing stylistic rules.

[Click on the image to view full-size]
 
Figure 5: Switching to Protobuf
 
Protobuf alone was not enough: different teams could still emit messages to the same topic, and the consumer may not be able to process it due to the format not being what was expected. Additionally, configuring Kafka consumers and producers was not an easy task, requiring intricate knowledge of the workload. As most teams were using Go, we decided to build a "message bus client library" in Go, incorporating best practices and allowing teams to move faster.

To avoid teams emitting different messages to the same topic, we made the controversial decision to enforce (on the client side) one protobuf message type per topic. While this decision enabled easy adoption, it resulted in numerous topics being created, with multiple partitions replicated, with a replication factor of at least three.

Connectors

The team had made significant progress in simplifying the Kafka infrastructure by introducing tooling and abstractions, but we realized that there were further use cases and patterns that needed to be addressed to ensure best practices were followed: the team developed the connector framework.

[Click on the image to view full-size]

Figure 6: The connector framework

Based on Kafka connectors, the framework enables engineers to create services that read from one system and push it to another one, like Kafka or Quicksilver, Cloudflare's Edge database. To simplify the process, we use Cookiecutter to template the service creation, and engineers only need to enter a few parameters into the CLI.

The configuration process for the connector is simple and can be done through environment variables without any code changes.

In the example below, the reader is Kafka and the writer is Quicksilver. The connector is set to read from topic 1 and topic 2 and apply the function pf_edge. This is the complete configuration needed, which also includes metrics, alerts, and everything else required to move into production, allowing teams to easily follow best practices. Teams have the option to register custom transformations, which would be the only pieces of code they would need to write.

[Click on the image to view full-size]

Figure 7: A simple connector

For example, we utilize connectors in the communication preferences service: if a user wants to opt out of marketing information in the Cloudflare dashboard, they interact with this service to do so. The communication preference upgrade is stored in its database, and a message is emitted to Kafka. To ensure that the change is reflected in three different source systems, we use separate connectors that sync the change to a transactional email service, a customer management system, and a market email system. This approach makes the system eventually consistent and we leverage the guarantees provided by Kafka to ensure that the process happens smoothly.

[Click on the image to view full-size]
 
Figure 8: Connector and communication preferences

Visibility

As our customer base grew rapidly during the pandemic, so did the throughput, highlighting scalability issues in some of the abstractions that we had created.

One example is the audit logs, which we handle for our Kafka customers: we built a system to manage these logs, allowing producer teams to produce the events, while we listen for them, recording the data in our database.

[Click on the image to view full-size]


 
Figure 9: Adding the log push for Audit logs

We expose this information through an API and an integration called log push that enables us to push the audit log data into various data buckets, such as Cloudflare R2 or Amazon S3.

During the pandemic, we experienced the registration of many more audit logs and customers started using our APIs to get the latest data. As the approach was not scalable, we decided to develop a pipeline to address the issue, creating a small service that listens for audit log events and transforms them into the appropriate format for direct storage in a bucket, without overloading the APIs.

We encountered further issues as we accumulated logs and were unable to clear them out quickly enough, resulting in lags and breaches of our SLAs. We were uncertain about the cause of the lag as we lacked the tools and instrumentation in our SDK to diagnose the problem: was the bottleneck reading from Kafka, transformation, or saving data to the database?

[Click on the image to view full-size]

Figure 10: Where is the bottleneck?

We decided to address it by enhancing our SDK with Prometheus metrics, with histograms measuring the time each step takes in processing a message. This helped us identify slower steps, but we couldn't tell which specific component was taking longer for a specific message. To solve this, we explored OpenTelemetry, focusing on its tracing integrations: there were not many good integrations for OpenTracing on Kafka, and it was challenging to propagate traces across different services during a production incident.

With the team enriching the SDK with OpenTracing, we were able to identify that pushing data to the bucket and reading from Kafka were both bottlenecks, prioritizing the fixes for those issues.

[Click on the image to view full-size]

Figure 11: Identifying the bottlenecks

Adding metrics into the SDK, we were able to get a better overview of the health of the cluster and the services.

Noisy On-call

We encountered a challenge with the large number of metrics we had collected, leading to a noisy on-call experience, with many alerts related to unhealthy applications and lag issues.

[Click on the image to view full-size]


Figure 12: Alerting pipeline

The basic alerting pipeline consists of Prometheus and AlertManager, which would page to PagerDuty. As restarting or scaling up/down services was not ideal, we decided to explore how to leverage Kubernetes and implement health checks.

In Kubernetes, there are three types of health checks: liveness, readiness, and startup. For Kafka, implementing the readiness probe is not useful because usually, an HTTP server is not exposed. To address this, an alternative approach was implemented.

[Click on the image to view full-size]


Figure 13: Health checks and Kafka

When a request for the liveness check is received, we attempt a basic operation with a broker, such as listing the topics, and if the response is successful, the check passes. However, there are cases where the application is still healthy but unable to produce or consume messages, which led the team to implement smarter health checks for the consumers.

[Click on the image to view full-size]


 
Figure 14: Health checks implementation

The current offset for Kafka is the last available offset on the partition, while the committed offset is the last offset that the consumer successfully consumed.
By retrieving these offsets during a health check, we can determine whether the consumer is operating correctly: if we can't retrieve the offsets, there are likely underlying issues, and the consumer is reported as unhealthy. If the offsets are retrievable, we compare the last committed offset to the current one. If they are the same, no new messages have been appended, and the consumer is considered healthy. If the last committed offset is different, we check if it is the same as the previously recorded last committed offset to make sure that the consumer is not stuck and needs a restart. This process resulted in better on-call experiences and happier customers.

Inability to Keep Up

We had a system where teams could produce events from Kafka for their email system. These events contained a template, for example, an "under attack" template, that includes information about a website under attack and the identity of the attacker, along with metadata.

We would listen for the event, retrieve the template for the email from their registry, enrich it, and dispatch it to the customers. However, we started to experience load issues: the team started to see spikes in the production rate, causing a lag in consumption and impacting important OTP messages and the SLOs.

[Click on the image to view full-size]


Figure 15: The lag in consumption

Batching

We started exploring different solutions to address the problem, with an initial solution of scaling the number of partitions and consumers not providing significant improvement.

[Click on the image to view full-size]


 
Figure 16: The batching approach

We decided to implement a simpler but more effective approach, batch consuming, processing a certain number of messages at a time, applying a transformation, and dispatching them in batches. This proved effective and allowed the team to easily handle high production rates.

[Click on the image to view full-size]

Figure 17: No lag in consumption with batching

Documentation

Developing our SDK, we found that many developers were encountering issues while using it. Some were encountering bugs while others were unsure how to implement certain features or interpret specific errors. To address this, we created channels on our Google Chat where users could come and ask us questions. We had one person on call to respond and spent time documenting our findings and answers in our wiki. This helped to improve the overall user experience for the SDK.

Conclusions

There are four lessons to be learned:

  • Always find the right balance between flexibility and simplicity: while a configurable setup may offer more flexibility, a simpler one allows standardization across different pipelines.
  • Visibility: adding metrics to the SDK as soon as possible can help teams understand how the system is behaving and make better decisions, especially during incidents.
  • Contracts: enforcing one strong, stric contract, gives great visibility into what is happening inside a topic, allowing one to know who is writing and reading from it.
  • Document the good work that you do so that you don't have to spend time answering questions or helping people debug production issues. This can be achieved through channels like Google Chat and wikis.

By following these rules, we were able to improve our systems and make our customers happy, even in high-stress situations.

 

About the Authors

BT