Netflix engineers recently published a deep dive into their Distributed Counter Abstraction, a scalable service designed to track user interactions, feature usage, and business performance metrics with low latency globally. Built atop Netflix's TimeSeries Abstraction, the system balances performance, accuracy, and cost through configurable counting modes, resilient data aggregation, and a globally distributed architecture.
Counting things might sound like a simple problem. However, Netflix's engineers highlight the hidden complexity:
Distributed counting is a challenging problem in computer science. [...] At Netflix, our counting use cases include tracking millions of user interactions, monitoring how often specific features or experiences are shown to users, and counting multiple facets of data during A/B test experiments, among others.
At the time of writing, this service processed close to 75K count requests/second globally across the different API endpoints and datasets while providing single-digit millisecond latencies for all its endpoints.
Latency information for various distributed counter operations (source)
Some counting use cases demand "best-effort counting", prioritizing low latency and minimal infrastructure costs over absolute accuracy. Others require "eventually consistent counting", ensuring precise and durable counts, albeit with a slight delay and increased operational costs. High availability, fault tolerance, and support for idempotent retries necessitate a balance between accuracy, performance, and scalability.
While the best-effort counter got away with a relatively simple EVCache-based implementation, Netflix implemented the eventual consistency counter using an event-driven architecture on top of the TimeSeries abstraction.
Each counting action is logged as an immutable event in the TimeSeries abstraction along with an idempotency key, ensuring durability and enabling idempotent retries. A background rollup process continuously aggregates these events using time-based windows, storing intermediate counts in a persistent store. This approach avoids data loss and supports consistent global aggregation through the data store's multi-region replication.
The rollup process for logged counter events (source)
Idempotency plays a crucial role in distributed systems. In a networked environment where failures, retries, and duplicate requests are common, idempotency ensures that repeated operations produce the same result as a single execution, allowing clients to retry their requests. The authors highlight a second useful strategy for clients in such environments:
Hedging is when the client sends an identical competing request to the server, if the original request does not come back with a response in an expected amount of time. The client then responds with whichever request completes first. This is done to keep the tail latencies for an application relatively low. This can only be done safely if the mutations are idempotent.
The TimeSeries abstraction itself is a scalable, high-throughput data platform that stores and queries temporal event data with millisecond-level latencies. It organizes data into time-series records, partitioning events by time and bucketing them within defined intervals. Each event is immutable, timestamped, and uniquely identified, enabling precise record-keeping and supporting efficient time-range queries. It currently processes nearly 15 million events/second across all datasets globally at peak time.
The throughput of the TimeSeries abstraction at Netflix (source)
Backed by storage solutions like Cassandra for durability and Elasticsearch for indexing, the abstraction ensures global availability and tunable consistency. Its architecture supports dynamic scaling, configurable retention policies, and adaptive query optimization, making it the foundation for data-intensive services like the Distributed Counter Abstraction.
Netflix engineers are now experimenting with an Accurate Global Counter. This counter type enhances real-time aggregation by combining pre-aggregated counts with a live delta of recent events. Instead of waiting for background rollups, it dynamically computes the latest count by scanning unprocessed events since the last aggregation. While this increases read complexity and resource usage, parallel queries and dynamic batching keep latencies low, enabling near-real-time accuracy for critical metrics.
High-level overview of the experimental Accurate Global Counter (source)