In this lesson we will:
- Learn more about the process of consuming data into the Kafka broker;
- Use the kafka-console-consumer script.
Kafka Consumers
Kafka Brokers have two types of clients - producers and consumers.
Kafka Consumers are the processes which are subscribing to and receiving data from Kafka topics.
Ordinarily, these consumers would be embedded within some application code such as an application or a Microservice, perhaps written in a language such as Java, Node.js or C#. Developers of these services would embed the Kafka client library into their code to implement and manage the connection to the Kafka broker.
This code may look something like the following:
// Add additional properties.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaJsonDeserializer");
props.put(KafkaJsonDeserializerConfig.JSON_VALUE_TYPE, DataRecord.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-consumer-1");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Consumer<String, DataRecord> consumer = new KafkaConsumer<String, DataRecord>(props);
consumer.subscribe(Arrays.asList(topic));
It is also common to connect databases directly to Kafka to ingest data is it created. For instance, databases such as Elasticsearch, Clickhouse and Apache Druid are able to subscribe directly to Kafka to ingest data.
kafka-console-consumer script
The Kafka broker includes a script for consuming messages from the console. This is useful for debugging purposes and for demonstrating the concepts that we need to understand during the lesson.
As usual, the script lives in the /bin folder:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic new_pizza_orders
When we execute the command, the process will listen for messages received. Let's test this by sending a message with the corresponding console producer script in a new browser window.
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic new_pizza_orders
Hello, World
The output should be seen on the consumer end:
Hello, World
Consumer Offsets
When we want to consume from a topic, we have a choice where to start consuming from:
- From Beginning - Offset 0
- From End - Highest Offset + 1 - i.e. consume messages that arrive after we start consuming
- From Offset - messages from offset 2
- From Timestamp
The option you specify depends on the semantics of how you process data.
Event Ordering
When working with event streaming platforms, there will be situations where ordering of data is critically important. For instance, it would make no sense to process an update to a customer record before the customer has been created, or to let people make a withdrawal before a credit.
It is therefore imporatnt to design your data flows with knowledge about the order which Kafka gurantees.
Exactly Once Processing
Much of the complexity of Kafka stems from it's attempt to deliver exactly once processing. If messages are lost, this could lead to critical issues for the business such as failed payments or notifications. However, just as important in many situations is that messages aren't double processed. For instance, we could send a payment twice or double send a notification. Kafka therefore aspires to deliver exactly once processing.
Message Offsets
Each message is allocated to an offset, which is a numeric identifier which grows from 0 upwards.
Offset 1 - { "ordernumber" : 1, "order_category" : "Electronics" }
Offset 2 - { "ordernumber" : 2, "order_category" : "Electronics" }
Offset 3 - { "ordernumber" : 3, "order_category" : "Electronics" }
The first message is offset 0, the second is offset 1, the third is offset 2 and so on.
These offsets are used to introduce reliability into the consumption process. As we consume messages, the consumer can tell the broker that it has succesfully processed messages up-to a given offset. If the process was to crash and restart.
This process is referred to as committing the offset. By default it is handled automatically, but if we want fine grained control, we can move towards manually committing the offset.