Key Takeaways
- One of the challenges that we have always faced in building applications, and systems as a whole, is how to exchange information between them efficiently whilst retaining the flexibility to modify the interfaces without undue impact elsewhere.
- Events offer a Goldilocks-style approach in which real-time APIs can be used as the foundation for applications which is flexible yet performant; loosely-coupled yet efficient. If you think about the business domain in which you work, you can probably think of many examples of events. They can be human-generated interactions, and they can be machine-generated.
- Apache Kafka offers a scalable event streaming platform with which you can build applications around the powerful concept of events. Kafka includes stream processing capabilities through the Kafka Streams API.
- ksqlDB is an event streaming database purpose-built for stream processing applications. It provides a SQL-based API for querying and processing data in Kafka.
- ksqlDB’s many features include filtering, transforming, and joining data from streams and tables in real-time, creating materialised views by aggregating events, and much more.
One of the challenges that we have always faced in building applications, and systems as a whole, is how to exchange information between them efficiently whilst retaining the flexibility to modify the interfaces without undue impact elsewhere. The more specific and streamlined an interface, the likelihood that it is so bespoke that to change it would require a complete rewrite. The inverse also holds; generic integration patterns may be adaptable and widely supported, but at the cost of performance.
Events offer a Goldilocks-style approach in which real-time APIs can be used as the foundation for applications which is flexible yet performant; loosely-coupled yet efficient.
Events can be considered as the building blocks of most other data structures. Generally speaking, they record the fact that something has happened and the point in time at which it occurred. An event can capture this information at various levels of detail: from a simple notification to a rich event describing the full state of what has happened.
From events, we can aggregate up to create state—the kind of state that we know and love from its place in RDBMS and NoSQL stores. As well as being the basis for state, events can also be used to asynchronously trigger actions elsewhere when something happens - the whole basis for event-driven architectures. In this way, we can build consumers to match our requirements—both stateless, and stateful with fault tolerance. Producers can opt to maintain state, but are not required to since consumers can rebuild this themselves from the events that are received.
If you think about the business domain in which you work, you can probably think of many examples of events. They can be human-generated interactions, and they can be machine-generated. They may contain a rich payload, or they may be in essence a notification alone. For example:
- Event:
userLogin
- Payload:
zbeeblebrox
logged in at2020-08-17 16:26:39 BST
- Payload:
- Event:
CarParked
- Payload: Car registration
A42 XYZ
parked at2020-08-17 16:36:27
in spaceX42
- Payload: Car registration
- Event:
orderPlaced
- Payload:
Robin
orderedfour tins of baked beans
costing a total of£2.25
at2020-08-17 16:35:41 BST
- Payload:
These events can be used to directly trigger actions elsewhere (such as a service that processes orders in response to them being placed), and they can also be used in aggregate to provide information (such as the current number of occupied spaces in a car park and thus which car parks have availability).
So, if events are the bedrock on which we are going to build our applications and services, we need a technology that supports us in the best way to do this—and this is where Apache Kafka® comes in. Kafka is a scalable event streaming platform that provides
- Pub/Sub
- To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
- Stateful stream processing
- To store streams of events durably and reliably for as long as you want.
- Storage
- To process streams of events as they occur or retrospectively.
Kafka is built around the concept of the log. By taking this simple but powerful concept of a distributed, immutable, append-only log, we can capture and store the events that occur in our businesses and systems in a scalable and efficient way. These events can be made available to multiple users on a subscription basis, as well as processed and aggregated further, either for direct use or for storage in other systems such as RDBMS, data lakes, and NoSQL stores.
In the remainder of this article, I will explore the APIs available in Apache Kafka and demonstrate how it can be used in the systems and applications that you are building.
The Producer and Consumer APIs
The great thing about a system like Kafka is that producers and consumers are decoupled, meaning, amongst other things, that we can produce data without needing a consumer in place first (and because of the decoupling we can do so at scale). An event happens, we send it to Kafka—simple as that. All we need to know is the details of the Kafka cluster, and the topic (a way of organising data in Kafka, kind of like tables in an RDBMS) to which we want to send the event.
There are clients available for Kafka in many different languages. Here’s an example of producing an event to Kafka using Go:
package main
import (
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "test_topic"
p, _ := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092"})
defer p.Close()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic,
Partition: 0},
Value: []byte("Hello world")}, nil)
}
Because Kafka stores events durably, it means that they are available as and when we want to consume them, until such time that we age them out (which is configurable per topic).
Having written the event to the Kafka topic, it’s now available for one, or more, consumers, to read. Consumers can behave in a traditional pub/sub manner and receive new events as they arrive, as well as opt to arbitrarily re-consume events from a previous point in time as required by the application. This replay functionality of Kafka, thanks to its durable and scalable storage layer, is a huge advantage for many important use cases in practice such as machine learning and A/B testing, where both historical and live data are needed. It’s also a hard requirement in regulated industries, where data must be retained for many years to meet legal compliance. Traditional messaging systems like RabbitMQ, ActiveMQ cannot support such requirements.
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "test_topic"
cm := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"go.events.channel.enable": true,
"group.id": "rmoff_01"}
c, _ := kafka.NewConsumer(&cm)
defer c.Close()
c.Subscribe(topic, nil)
for {
select {
case ev := <-c.Events():
switch ev.(type) {
case *kafka.Message:
km := ev.(*kafka.Message)
fmt.Printf("✅ Message '%v' received from topic '%v'\n", string(km.Value), string(*km.TopicPartition.Topic))
}
}
}
}
When a consumer connects to Kafka, it provides a Consumer Group identifier. The consumer group concept enables two pieces of functionality. The first is that Kafka keeps track of the point in the topic to which the consumer has read events, so that when the consumer reconnects it can continue reading from the point that it got to before. The second is that the consuming application may want to scale its reads across multiple instances of itself, forming a Consumer Group that allows for processing of your data in parallel. Kafka will then allocate events to each consumer within the group based on the topic partitions available, and it will actively manage the group should members subsequently leave or join (e.g., in case one consumer instance crashed).
This means that multiple services can use the same data, without any interdependency between them. The same data can also be routed to datastores elsewhere using the Kafka Connect API which is discussed below.
The Producer and Consumer APIs are available in libraries for Java, C/C++, Go, Python, Node.js, and many more. But what if your application wants to use HTTP instead of the native Kafka protocol? For this, there is a REST Proxy.
Using a REST API with Apache Kafka
Let’s say we’re writing an application for a device for a smart car park. A payload for the event recording the fact that a car has just occupied a space might look like this:
{
"name": "NCP Sheffield",
"space": "A42",
"occupied": true
}
We could put this event on a Kafka topic, which would also record the time of the event as part of the event’s metadata. Producing data to Kafka using the Confluent REST Proxy is a straightforward REST call:
curl -X POST \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' \
"http://localhost:8082/topics/carpark"
Any application can consume from this topic, using the native Consumer API that we saw above, or by using a REST call. Just like with the native Consumer API, consumers using the REST API are also members of a Consumer Group, which is termed a subscription. Thus with the REST API you have to declare both your consumer and subscription first:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/rmoff_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' \
http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription
Having done this you can then read the events:
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records
[
{
"topic": "carpark",
"key": null,
"value": {
"name": "Sheffield NCP",
"space": "A42",
"occupied": true
},
"partition": 0,
"offset": 0
}
]
If there are multiple events to receive, then you’ll get them within a batch per call, and if your client wants to check for new events, they will need to make the REST call again.
We’ve seen how we can get data in and out of Kafka topics. But a lot of the time we want to do more than just straight-forward pub/sub. We want to take a stream of events and look at the bigger picture—of all the cars coming and going, how many spaces are there free right now? Or perhaps we’d like to be able to subscribe to a stream of updates for a particular car park only?
Conditional Notifications, Stream Processing, and Materialised Views
To think of Apache Kafka as pub/sub alone is to think of an iPhone as just a device for making and receiving phone calls. I mean, it’s not wrong to describe that as one of its capabilities…but it does so much more than just that. Apache Kafka includes stream processing capabilities through the Kafka Streams API. This is a feature-rich Java client library for doing stateful stream processing on data in Kafka at scale and across multiple machines. Widely used at companies such as Walmart, Ticketmaster, and Bloomberg, Kafka Streams also provides the foundations for ksqlDB.
ksqlDB is an event streaming database purpose-built for stream processing applications. It provides a SQL-based API for querying and processing data in Kafka. ksqlDB’s many features include filtering, transforming, and joining data from streams and tables in real-time, creating materialised views by aggregating events, and much more.
To work with the data in ksqlDB we first need to declare a schema:
CREATE STREAM CARPARK_EVENTS (NAME VARCHAR,
SPACE VARCHAR,
OCCUPIED BOOLEAN)
WITH (KAFKA_TOPIC='carpark',
VALUE_FORMAT='JSON');
ksqlDB is deployed as a clustered application, and this initial declaration work can be done at startup, or directly by the client, as required. With this done, any client can now subscribe to a stream of changes from the original topic but with a filter applied. For example, to get a notification when a space is released at a particular car park they could run:
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,
SPACE
FROM CARPARK_EVENTS
WHERE NAME='Sheffield NCP'
AND OCCUPIED=false
EMIT CHANGES;
Unlike the SQL queries that you may be used to, this query is a continuous query (denoted by the EMIT CHANGES
clause). Continuous queries, known as push queries, will continue to return any new matches to the predicate as the events occur, now and in the future, until they are terminated. ksqlDB also supports pull queries (which we explore below), and these behave just like a query against a regular RDBMS, returning values for a lookup at a point in time. ksqlDB thus supports both the worlds of streaming and static state, which in practice most applications will also need to do based on the actions being performed.
ksqlDB includes a comprehensive REST API, the call against which for the above SQL would look like this using curl
:
curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'\''yyyy-MM-dd HH:mm:ss'\'') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='\''Sheffield NCP'\'' and OCCUPIED=false EMIT CHANGES;"}'
This call results in a streaming response from the server, with a header and then when any matching events from the source topic are received these are sent to the client:
{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}
…
["2020-08-05 16:02:33","A42"]
…
…
…
["2020-08-05 16:07:31","D72"]
…
We can use ksqlDB to define and populate new streams of data too. By prepending a SELECT
statement with CREATE STREAM streamname AS
we can route the output of the continuous query to a Kafka topic. Thus we can use ksqlDB to perform transformations, joins, filtering, and more on the events that we send to Kafka. ksqlDB supports the concept of a table as a first-class object type, and we could use this to enrich the car park events that we’re receiving with information about the car park itself:
CREATE STREAM CARPARKS AS
SELECT E.NAME AS NAME, E.SPACE,
R.LOCATION, R.CAPACITY,
E.OCCUPIED,
CASE
WHEN OCCUPIED=TRUE THEN 1
ELSE -1
END AS OCCUPIED_IND
FROM CARPARK_EVENTS E
INNER JOIN
CARPARK_REFERENCE R
ON E.NAME = R.NAME;
You’ll notice we’ve also used a CASE
statement to apply logic to the data enabling us to create a running count of available spaces. The above CREATE STREAM
populates a Kafka topic that looks like this:
+----------------+-------+----------+----------------------------+----------+--------------+
|NAME |SPACE |OCCUPIED |LOCATION |CAPACITY |OCCUPIED_IND |
+----------------+-------+----------+----------------------------+----------+--------------+
|Sheffield NCP |E48 |true |{LAT=53.4265964, LON=-1.8426|1000 |1 |
| | | |386} | | |
Finally, let’s see how we can create a stateful aggregation in ksqlDB and query it from a client. To create the materialised view, you run SQL that includes aggregate functions:
CREATE TABLE CARPARK_SPACES AS
SELECT NAME,
SUM(OCCUPIED_IND) AS OCCUPIED_SPACES
FROM CARPARKS
GROUP BY NAME;
This state is maintained across the distributed ksqlDB nodes and can be queried directly using the REST API:
curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
Unlike the streaming response that we saw above, queries against the state (known as "pull queries", as opposed to "push queries") return immediately and then exit:
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[30]
If the application wants to get the latest figure, they can reissue the query, and the value may or may not have changed
curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[29]
There is also a Java client for ksqlDB, and community-authored Python and Go clients.
Integration with other systems
One of the benefits of using Apache Kafka as a highly-scalable and persistent broker for your asynchronous messaging is that the same data you exchange between your applications can also drive stream processing (as we saw above) and also be fed directly into dependent systems.
Continuing from the example of an application that sends an event every time a car parks or leaves a space, it’s likely that we’ll want to use this information elsewhere, such as:
- analytics to look at parking behaviours and trends
- machine learning to predict capacity requirements
- data feeds to third-party vendors
Using Apache Kafka’s Connect API you can define streaming integrations with systems both in and out of Kafka. For example, to stream the data from Kafka to S3 in real-time you could run:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config \
-d ' {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "carpark",
"s3.bucket.name": "rmoff-carparks",
"s3.region": "us-west-2",
"flush.size": "1024",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat"
}'
Now the same data that is driving the notifications to your application, and building the state that your application can query directly, is also streaming to S3. Each use is decoupled from the other. If we subsequently want to stream the same data to another target such as Snowflake, we just add another Kafka Connect configuration; the other consumers are entirely unaffected. Kafka Connect can also stream data into Kafka. For example, the CARPARK_REFERENCE
table that we use in ksqlDB above could be streamed using change data capture (CDC) from a database that acts as the system of record for this data.
Conclusion
Apache Kafka offers a scalable event streaming platform with which you can build applications around the powerful concept of events. By using events as the basis for connecting your applications and services, you benefit in many ways, including loose coupling, service autonomy, elasticity, flexible evolvability, and resilience.
You can use the APIs of Kafka and its surrounding ecosystem, including ksqlDB, for both subscription-based consumption as well as key/value lookups against materialised views, without the need for additional data stores. The APIs are available as native clients as well as over REST.
To learn more about Apache Kafka visit developer.confluent.io. Confluent Platform is a distribution of Apache Kafka that includes all the components discussed in this article. It’s available on-premises or as a managed service called Confluent Cloud. You can find the code samples for this article and a Docker Compose to run it yourself on GitHub. If you would like to learn more about building event-driven systems around Kafka, then be sure to read Ben Stopford’s excellent book Designing Event-Driven Systems.
About the Author
Robin Moffatt is a Senior Developer Advocate at Confluent, the company founded by the original creators of Apache Kafka, as well as an Oracle ACE Director (Alumnus). He has been speaking at conferences since 2009 including QCon, Devoxx, Strata, Kafka Summit, and Øredev. You can find his talks online, subscribe to his YouTube channel (thanks lockdown!), and read his blog. Outside of work, Robin enjoys running, drinking good beer, and eating fried breakfasts—although generally not at the same time.