Kafka Model and View to Read Data From Topic

Affiliate iv. Kafka Consumers: Reading Data from Kafka

Applications that need to read data from Kafka use a KafkaConsumer to subscribe to Kafka topics and receive messages from these topics. Reading data from Kafka is a bit different than reading data from other messaging systems, and at that place are few unique concepts and ideas involved. It is hard to understand how to use the consumer API without understanding these concepts outset. We'll outset by explaining some of the important concepts, and then we'll get through some examples that testify the unlike ways consumer APIs can exist used to implement applications with varying requirements.

Kafka Consumer Concepts

In order to understand how to read data from Kafka, you commencement need to empathize its consumers and consumer groups. The post-obit sections cover those concepts.

Consumers and Consumer Groups

Suppose you take an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another information store. In this case your application will create a consumer object, subscribe to the appropriate topic, and showtime receiving messages, validating them and writing the results. This may piece of work well for a while, but what if the rate at which producers write letters to the topic exceeds the rate at which your application can validate them? If you are limited to a single consumer reading and processing the information, your application may autumn farther and farther backside, unable to keep up with the rate of incoming messages. Obviously in that location is a need to calibration consumption from topics. Only like multiple producers can write to the same topic, we need to let multiple consumers to read from the same topic, splitting the data between them.

Kafka consumers are typically part of a consumer group. When multiple consumers are subscribed to a topic and belong to the same consumer grouping, each consumer in the group will receive messages from a different subset of the partitions in the topic.

Allow's take topic T1 with 4 partitions. Now suppose we created a new consumer, C1, which is the merely consumer in grouping G1, and use it to subscribe to topic T1. Consumer C1 will get all letters from all four T1 partitions. Come across Effigy 4-1.

ktdg 04in01

Figure 4-ane. One Consumer group with 4 partitions

If we add another consumer, C2, to grouping G1, each consumer volition merely get letters from two partitions. Perchance letters from partition 0 and 2 go to C1 and messages from partitions 1 and iii go to consumer C2. Meet Effigy iv-2.

ktdg 04in02

Figure iv-2. 4 partitions divide to two consumers in a group

If G1 has four consumers, so each volition read letters from a single partition. See Figure 4-three.

ktdg 04in03

Figure 4-3. Four consumers in a group with 1 partition each

If we add more than consumers to a unmarried group with a single topic than we have partitions, some of the consumers will be idle and get no messages at all. See Effigy 4-4.

ktdg 04in04

Figure 4-4. More than consumers in a group than partitions means idle consumers

The chief way nosotros scale information consumption from a Kafka topic is past adding more than consumers to a consumer grouping. Information technology is common for Kafka consumers to do high-latency operations such equally write to a database or a fourth dimension-consuming computation on the data. In these cases, a single consumer tin't mayhap go along up with the rate data flows into a topic, and adding more consumers that share the load past having each consumer own just a subset of the partitions and messages is our main method of scaling. This is a good reason to create topics with a large number of partitions—it allows adding more consumers when the load increases. Keep in listen that at that place is no point in adding more consumers than you take partitions in a topic—some of the consumers will only be idle. Affiliate 2 includes some suggestions on how to choose the number of partitions in a topic.

In add-on to adding consumers in social club to scale a unmarried application, it is very mutual to accept multiple applications that need to read data from the same topic. In fact, one of the principal design goals in Kafka was to make the data produced to Kafka topics available for many use cases throughout the organization. In those cases, we desire each application to get all of the messages, rather than merely a subset. To make sure an awarding gets all the messages in a topic, ensure the awarding has its own consumer group. Unlike many traditional messaging systems, Kafka scales to a large number of consumers and consumer groups without reducing functioning.

In the previous instance, if we add a new consumer group G2 with a unmarried consumer, this consumer will get all the messages in topic T1 contained of what G1 is doing. G2 can have more a single consumer, in which instance they will each get a subset of partitions, just like we showed for G1, merely G2 as a whole will still get all the messages regardless of other consumer groups. Meet Effigy 4-5.

ktdg 04in05

Effigy iv-5. Adding a new consumer group, both groups receive all messages

To summarize, you create a new consumer grouping for each application that needs all the letters from 1 or more topics. You lot add consumers to an existing consumer group to scale the reading and processing of messages from the topics, so each additional consumer in a grouping will only become a subset of the messages.

Consumer Groups and Partition Rebalance

As we saw in the previous section, consumers in a consumer grouping share buying of the partitions in the topics they subscribe to. When nosotros add together a new consumer to the group, it starts consuming messages from partitions previously consumed by some other consumer. The same thing happens when a consumer shuts downwardly or crashes; it leaves the group, and the partitions information technology used to consume will exist consumed by one of the remaining consumers. Reassignment of partitions to consumers also happen when the topics the consumer group is consuming are modified (e.g., if an administrator adds new partitions).

Moving partitioning ownership from i consumer to another is called a rebalance. Rebalances are important because they provide the consumer group with loftier availability and scalability (allowing us to easily and safely add together and remove consumers), but in the normal course of events they are fairly undesirable. During a rebalance, consumers can't consume letters, so a rebalance is basically a curt window of unavailability of the unabridged consumer group. In add-on, when partitions are moved from one consumer to some other, the consumer loses its electric current country; if it was caching whatsoever data, it will need to refresh its caches—slowing downward the application until the consumer sets up its country once more. Throughout this affiliate we will discuss how to safely handle rebalances and how to avert unnecessary ones.

The way consumers maintain membership in a consumer grouping and ownership of the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the group coordinator (this banker can be dissimilar for different consumer groups). As long as the consumer is sending heartbeats at regular intervals, it is assumed to be alive, well, and processing letters from its partitions. Heartbeats are sent when the consumer polls (i.eastward., retrieves records) and when information technology commits records it has consumed.

If the consumer stops sending heartbeats for long enough, its session volition fourth dimension out and the group coordinator will consider it dead and trigger a rebalance. If a consumer crashed and stopped processing messages, it volition take the group coordinator a few seconds without heartbeats to decide it is dead and trigger the rebalance. During those seconds, no messages will be candy from the partitions owned by the dead consumer. When closing a consumer cleanly, the consumer will notify the group coordinator that it is leaving, and the group coordinator will trigger a rebalance immediately, reducing the gap in processing. Afterward in this affiliate we volition talk over configuration options that command heartbeat frequency and session timeouts and how to gear up those to match your requirements.

How Does the Procedure of Assigning Partitions to Brokers Work?

When a consumer wants to bring together a group, it sends a JoinGroup request to the group coordinator. The start consumer to join the group becomes the grouping leader. The leader receives a listing of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and which are therefore considered alive) and is responsible for assigning a subset of partitions to each consumer. It uses an implementation of PartitionAssignor to make up one's mind which partitions should be handled by which consumer.

Kafka has two born partition assignment policies, which nosotros will discuss in more depth in the configuration section. After deciding on the partition consignment, the consumer group leader sends the list of assignments to the GroupCoordinator, which sends this information to all the consumers. Each consumer merely sees his ain consignment—the leader is the but client process that has the full list of consumers in the grouping and their assignments. This process repeats every time a rebalance happens.

Creating a Kafka Consumer

The kickoff footstep to start consuming records is to create a KafkaConsumer instance. Creating a KafkaConsumer is very like to creating a KafkaProducer—you create a Java Properties example with the properties you desire to pass to the consumer. Nosotros will discuss all the properties in depth later in the chapter. To showtime we just need to use the 3 mandatory properties: bootstrap.servers, fundamental.deserializer, and value.deserializer.

The first property, bootstrap.servers, is the connection string to a Kafka cluster. Information technology is used the exact same way as in KafkaProducer (you can refer to Affiliate 3 for details on how this is defined). The other two properties, key.deserializer and value.deserializer, are similar to the serializers divers for the producer, only rather than specifying classes that turn Java objects to byte arrays, you need to specify classes that can take a byte assortment and plow it into a Java object.

At that place is a quaternary property, which is non strictly mandatory, only for at present nosotros will pretend it is. The belongings is group.id and it specifies the consumer grouping the Kafka Consumer instance belongs to. While information technology is possible to create consumers that practice not belong to any consumer group, this is uncommon, then for almost of the chapter we will presume the consumer is office of a group.

The following code snippet shows how to create a KafkaConsumer:

Backdrop props = new Backdrop(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("cardinal.deserializer",     "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer",     "org.apache.kafka.common.serialization.StringDeserializer");  KafkaConsumer<String, String> consumer =     new KafkaConsumer<String, String>(props);

Almost of what you see hither should be familiar if you've read Affiliate 3 on creating producers. Nosotros assume that the records nosotros swallow will accept String objects as both the fundamental and the value of the record. The just new property here is group.id, which is the name of the consumer group this consumer belongs to.

Subscribing to Topics

Once we create a consumer, the adjacent step is to subscribe to ane or more topics. The subcribe() method takes a list of topics as a parameter, then it'due south pretty simple to utilise:

consumer.subscribe(Collections.singletonList("customerCountries"));            1          
1

Here we merely create a list with a single element: the topic name customerCountries .

It is too possible to call subscribe with a regular expression. The expression tin can lucifer multiple topic names, and if someone creates a new topic with a name that matches, a rebalance will happen near immediately and the consumers will start consuming from the new topic. This is useful for applications that demand to consume from multiple topics and can handle the unlike types of data the topics will contain. Subscribing to multiple topics using a regular expression is virtually unremarkably used in applications that replicate information betwixt Kafka and another organization.

To subscribe to all test topics, we can call:

consumer.subscribe(Pattern.compile("test.*"));

The Poll Loop

At the heart of the consumer API is a uncomplicated loop for polling the server for more information. Once the consumer subscribes to topics, the poll loop handles all details of coordination, sectionalization rebalances, heartbeats, and data fetching, leaving the developer with a clean API that simply returns bachelor data from the assigned partitions. The main body of a consumer will look every bit follows:

try {     while (true) {            1            ConsumerRecords<Cord, Cord> records = consumer.poll(100);            2            for (ConsumerRecord<String, String> record : records)            3            {             log.debug("topic = %due south, partition = %d, first = %d,"                 customer = %southward, country = %southward\n",                 record.topic(), record.partition(), tape.showtime(),                 tape.key(), record.value());              int updatedCount = 1;             if (custCountryMap.countainsKey(tape.value())) {                 updatedCount = custCountryMap.get(record.value()) + i;             }             custCountryMap.put(record.value(), updatedCount)              JSONObject json = new JSONObject(custCountryMap);             Organization.out.println(json.toString(4))            4            }     } } finally {     consumer.close();            5            }
1

This is indeed an infinite loop. Consumers are usually long-running applications that continuously poll Kafka for more information. We volition bear witness later in the chapter how to cleanly get out the loop and shut the consumer.

2

This is the well-nigh of import line in the chapter. The same manner that sharks must keep moving or they die, consumers must keep polling Kafka or they will be considered expressionless and the partitions they are consuming will be handed to another consumer in the group to continue consuming. The parameter we pass, poll(), is a timeout interval and controls how long poll() volition block if information is not available in the consumer buffer. If this is set to 0, poll() will return immediately; otherwise, information technology will wait for the specified number of milliseconds for data to go far from the broker.

3

poll() returns a listing of records. Each record contains the topic and partition the record came from, the kickoff of the record within the partition, and of course the fundamental and the value of the record. Typically we want to iterate over the list and process the records individually.

4

Processing ordinarily ends in writing a result in a data store or updating a stored record. Hither, the goal is to keep a running count of customers from each county, then we update a hashtable and print the result as JSON. A more realistic instance would store the updates upshot in a information shop.

5

E'er close() the consumer before exiting. This will close the network connections and sockets. It will also trigger a rebalance immediately rather than look for the group coordinator to detect that the consumer stopped sending heartbeats and is likely dead, which will take longer and therefore result in a longer period of time in which consumers can't swallow messages from a subset of the partitions.

The poll loop does a lot more than but get information. The commencement time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator, joining the consumer group, and receiving a partition assignment. If a rebalance is triggered, it will be handled inside the poll loop as well. And of form the heartbeats that proceed consumers alive are sent from within the poll loop. For this reason, we try to make sure that whatever processing we do between iterations is fast and efficient.

Thread Safe

You can't have multiple consumers that vest to the same group in 1 thread and you tin't accept multiple threads safely utilise the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will demand to run each in its own thread. It is useful to wrap the consumer logic in its own object and and so use Java's ExecutorService to showtime multiple threads each with its ain consumer. The Confluent blog has a tutorial that shows how to do just that.

Configuring Consumers

So far we accept focused on learning the consumer API, but we've merely looked at a few of the configuration properties—just the mandatory bootstrap.servers, grouping.id, central.deserializer, and value.deserializer. All the consumer configuration is documented in Apache Kafka documentation. Virtually of the parameters have reasonable defaults and do not require modification, but some have implications on the functioning and availability of the consumers. Allow'south take a look at some of the more important properties.

fetch.min.bytes

This property allows a consumer to specify the minimum amount of data that information technology wants to receive from the broker when fetching records. If a banker receives a asking for records from a consumer but the new records amount to fewer bytes than fetch.min.bytes, the banker will wait until more letters are available earlier sending the records back to the consumer. This reduces the load on both the consumer and the banker as they have to handle fewer back-and-forth letters in cases where the topics don't take much new activeness (or for lower activity hours of the mean solar day). Yous volition want to prepare this parameter higher than the default if the consumer is using also much CPU when there isn't much data available, or reduce load on the brokers when you take big number of consumers.

fetch.max.await.ms

By setting fetch.min.bytes, yous tell Kafka to wait until information technology has enough data to transport before responding to the consumer. fetch.max.await.ms lets you control how long to expect. Past default, Kafka will look up to 500 ms. This results in upwardly to 500 ms of extra latency in case there is not plenty information flowing to the Kafka topic to satisfy the minimum amount of information to return. If you desire to limit the potential latency (ordinarily due to SLAs controlling the maximum latency of the awarding), you lot tin set fetch.max.wait.ms to a lower value. If you set fetch.max.look.ms to 100 ms and fetch.min.bytes to i MB, Kafka will receive a fetch request from the consumer and will respond with information either when it has one MB of data to return or after 100 ms, whichever happens first.

max.partition.fetch.bytes

This property controls the maximum number of bytes the server will return per partition. The default is 1 MB, which ways that when KafkaConsumer.poll() returns ConsumerRecords, the tape object will use at most max.partition.fetch.bytes per sectionalization assigned to the consumer. So if a topic has xx partitions, and you have 5 consumers, each consumer will need to have 4 MB of memory bachelor for ConsumerRecords. In exercise, you lot will desire to allocate more memory as each consumer will need to handle more than partitions if other consumers in the group fail. max. partition.fetch.bytes must exist larger than the largest bulletin a broker will have (determined by the max.message.bytes property in the broker configuration), or the broker may accept messages that the consumer will be unable to consume, in which case the consumer will hang trying to read them. Some other important consideration when setting max.division.fetch.bytes is the amount of fourth dimension information technology takes the consumer to process information. Equally you think, the consumer must call poll() frequently enough to avoid session timeout and subsequent rebalance. If the amount of data a unmarried poll() returns is very big, it may have the consumer longer to procedure, which ways it will not become to the next iteration of the poll loop in time to avoid a session timeout. If this occurs, the two options are either to lower max. partition.fetch.bytes or to increase the session timeout.

session.timeout.ms

The corporeality of fourth dimension a consumer can be out of contact with the brokers while still considered alive defaults to 10 seconds. If more than session.timeout.ms passes without the consumer sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. This holding is closely related to heartbeat.interval.ms. heartbeat.interval.ms controls how frequently the KafkaConsumer poll() method will ship a heartbeat to the group coordinator, whereas session.timeout.ms controls how long a consumer can get without sending a heartbeat. Therefore, those two properties are typically modified together—heartbeat.interval.ms must be lower than session.timeout.ms, and is usually fix to one-third of the timeout value. So if session.timeout.ms is 3 seconds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than the default will permit consumer groups to detect and recover from failure sooner, merely may besides cause unwanted rebalances as a consequence of consumers taking longer to consummate the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but as well means it will take longer to discover a real failure.

car.offset.reset

This property controls the beliefs of the consumer when it starts reading a segmentation for which it doesn't accept a committed offset or if the committed offset it has is invalid (usually because the consumer was down for so long that the record with that beginning was already aged out of the broker). The default is "latest," which means that lacking a valid outset, the consumer will start reading from the newest records (records that were written after the consumer started running). The culling is "primeval," which means that lacking a valid beginning, the consumer will read all the data in the segmentation, starting from the very beginning. Setting motorcar.get-go.reset to none will cause an exception to exist thrown when attempting to consume from invalid offset.

enable.auto.commit

We'll talk over the different options for committing offsets afterward in this affiliate. This parameter controls whether the consumer volition commit offsets automatically, and defaults to true. Ready it to false if you prefer to command when offsets are committed, which is necessary to minimize duplicates and avoid missing data. If y'all gear up enable.auto.commit to truthful, and then you lot might also want to control how ofttimes offsets will be committed using auto.commit.interval.ms.

partition.assignment.strategy

We learned that partitions are assigned to consumers in a consumer group. A PartitionAssignor is a grade that, given consumers and topics they subscribed to, decides which partitions will be assigned to which consumer. By default, Kafka has two assignment strategies:

Range

Assigns to each consumer a consecutive subset of partitions from each topic it subscribes to. So if consumers C1 and C2 are subscribed to two topics, T1 and T2, and each of the topics has three partitions, and so C1 volition exist assigned partitions 0 and 1 from topics T1 and T2, while C2 will exist assigned partition two from those topics. Considering each topic has an uneven number of partitions and the assignment is done for each topic independently, the first consumer ends upward with more partitions than the second. This happens whenever Range assignment is used and the number of consumers does not divide the number of partitions in each topic neatly.

RoundRobin

Takes all the partitions from all subscribed topics and assigns them to consumers sequentially, ane by one. If C1 and C2 described previously used RoundRobin assignment, C1 would have partitions 0 and two from topic T1 and partition 1 from topic T2. C2 would take partitioning 1 from topic T1 and partitions 0 and 2 from topic T2. In full general, if all consumers are subscribed to the same topics (a very common scenario), RoundRobin consignment will end up with all consumers having the aforementioned number of partitions (or at most i sectionalization difference).

The division.assignment.strategy allows you to choose a partition-assignment strategy. The default is org.apache.kafka.clients.consumer.RangeAssignor, which implements the Range strategy described above. You tin can supercede it with org.apache.kafka.clients.consumer.RoundRobinAssignor. A more advanced option is to implement your own assignment strategy, in which example sectionalisation.assignment.strategy should point to the proper noun of your class.

customer.id

This tin can be whatsoever string, and will exist used by the brokers to identify messages sent from the customer. It is used in logging and metrics, and for quotas.

max.poll.records

This controls the maximum number of records that a single call to poll() volition return. This is useful to assistance command the amount of data your awarding will need to procedure in the polling loop.

receive.buffer.bytes and ship.buffer.bytes

These are the sizes of the TCP send and receive buffers used by the sockets when writing and reading information. If these are set to -1, the OS defaults will be used. It can be a good thought to increase those when producers or consumers communicate with brokers in a different datacenter, considering those network links typically have higher latency and lower bandwidth.

Commits and Offsets

Whenever we call poll(), it returns records written to Kafka that consumers in our group have not read yet. This means that nosotros take a way of tracking which records were read by a consumer of the group. As discussed before, one of Kafka'south unique characteristics is that it does not track acknowledgments from consumers the manner many JMS queues practice. Instead, it allows consumers to use Kafka to track their position (offset) in each partition.

We call the action of updating the current position in the partition a commit.

How does a consumer commit an outset? It produces a message to Kafka, to a special __consumer_offsets topic, with the committed offset for each partition. Every bit long as all your consumers are up, running, and churning away, this volition have no impact. However, if a consumer crashes or a new consumer joins the consumer group, this will trigger a rebalance. After a rebalance, each consumer may be assigned a new fix of partitions than the one information technology candy before. In social club to know where to selection up the work, the consumer volition read the latest committed first of each partition and continue from there.

If the committed offset is smaller than the offset of the final message the client processed, the messages between the terminal candy first and the committed outset will be processed twice. Run across Figure 4-6.

ktdg 04in06

Figure 4-6. Re-processed messages

If the committed offset is larger than the offset of the last bulletin the customer actually candy, all letters between the concluding processed offset and the committed outset volition be missed by the consumer grouping. Run across Figure 4-seven.

ktdg 04in07

Effigy 4-7. Missed letters between offsets

Clearly, managing offsets has a big impact on the client application. The KafkaConsumer API provides multiple ways of committing offsets.

Automatic Commit

The easiest mode to commit offsets is to allow the consumer to practise it for you lot. If y'all configure enable.auto.commit=true, then every five seconds the consumer will commit the largest offset your client received from poll(). The v-second interval is the default and is controlled by setting auto.commit.interval.ms. Just like everything else in the consumer, the automatic commits are driven by the poll loop. Whenever you poll, the consumer checks if it is time to commit, and if it is, it volition commit the offsets it returned in the last poll.

Before using this convenient pick, still, it is important to understand the consequences.

Consider that, by default, automatic commits occur every five seconds. Suppose that we are three seconds after the most recent commit and a rebalance is triggered. After the rebalancing, all consumers will outset consuming from the concluding get-go committed. In this case, the offset is three seconds old, so all the events that arrived in those three seconds will be candy twice. It is possible to configure the commit interval to commit more than frequently and reduce the window in which records will be duplicated, but information technology is incommunicable to completely eliminate them.

With autocommit enabled, a call to poll will always commit the last offset returned by the previous poll. It doesn't know which events were really candy, then information technology is critical to always procedure all the events returned by poll() earlier calling poll() again. (Just like poll(), close() also commits offsets automatically.) This is ordinarily not an issue, but pay attention when y'all handle exceptions or exit the poll loop prematurely.

Automatic commits are convenient, only they don't give developers enough control to avoid duplicate messages.

Commit Current Start

Most developers practice more control over the time at which offsets are committed—both to eliminate the possibility of missing letters and to reduce the number of messages duplicated during rebalancing. The consumer API has the option of committing the current beginning at a point that makes sense to the awarding developer rather than based on a timer.

By setting enable.auto.commit=false, offsets will only be committed when the application explicitly chooses to practise so. The simplest and most reliable of the commit APIs is commitSync(). This API will commit the latest get-go returned by poll() and return one time the offset is committed, throwing an exception if commit fails for some reason.

It is important to recall that commitSync() volition commit the latest commencement returned by poll(), and then make sure you lot call commitSync() subsequently y'all are washed processing all the records in the drove, or you risk missing letters as described previously. When a rebalance is triggered, all the messages from the beginning of the most recent batch until the time of the rebalance volition be processed twice.

Hither is how we would use commitSync to commit offsets subsequently nosotros finished processing the latest batch of messages:

while (true) {     ConsumerRecords<String, String> records = consumer.poll(100);     for (ConsumerRecord<String, Cord> record : records) {         Arrangement.out.printf("topic = %s, sectionalisation = %d, commencement =             %d, customer = %due south, country = %s\n",             record.topic(), record.sectionalization(),             tape.first(), record.key(), record.value());              1              }     attempt {         consumer.commitSync();              2              } catch (CommitFailedException east) {         log.error("commit failed", e)              3              } }
1

Allow'due south assume that by printing the contents of a record, nosotros are done processing information technology. Your application will likely do a lot more with the records—modify them, enrich them, aggregate them, display them on a dashboard, or notify users of important events. You should decide when y'all are "done" with a tape according to your utilize case.

2

In one case nosotros are washed "processing" all the records in the current batch, nosotros call commitSync to commit the final outset in the batch, before polling for additional letters.

3

commitSync retries committing equally long as there is no error that can't be recovered. If this happens, there is not much we can exercise except log an error.

Asynchronous Commit

I drawback of manual commit is that the application is blocked until the broker responds to the commit asking. This will limit the throughput of the awarding. Throughput tin be improved past committing less oftentimes, only and then we are increasing the number of potential duplicates that a rebalance will create.

Another pick is the asynchronous commit API. Instead of waiting for the broker to answer to a commit, we just send the request and continue on:

while (true) {     ConsumerRecords<Cord, String> records = consumer.poll(100);     for (ConsumerRecord<String, String> record : records) {         System.out.printf("topic = %s, division = %southward,             kickoff = %d, customer = %s, land = %south\north",             tape.topic(), record.segmentation(), record.offset(),             record.central(), tape.value());     }     consumer.commitAsync();              1              }
1

Commit the terminal offset and conduct on.

The drawback is that while commitSync() volition retry the commit until information technology either succeeds or encounters a nonretriable failure, commitAsync() will not retry. The reason information technology does not retry is that by the fourth dimension commitAsync() receives a response from the server, at that place may take been a later commit that was already successful. Imagine that nosotros sent a request to commit offset 2000. In that location is a temporary communication problem, so the broker never gets the request and therefore never responds. Meanwhile, we processed another batch and successfully committed start 3000. If commitAsync() now retries the previously failed commit, it might succeed in committing commencement 2000 afterwards offset 3000 was already candy and committed. In the example of a rebalance, this will cause more duplicates.

We mention this complexity and the importance of correct order of commits, because commitAsync() besides gives you an option to pass in a callback that will be triggered when the broker responds. Information technology is common to use the callback to log commit errors or to count them in a metric, just if you lot want to utilise the callback for retries, you demand to be aware of the trouble with commit social club:

while (true) {     ConsumerRecords<String, Cord> records = consumer.poll(100);     for (ConsumerRecord<String, Cord> record : records) {         System.out.printf("topic = %s, partition = %due south,         outset = %d, customer = %south, land = %s\north",         record.topic(), record.partition(), record.offset(),         record.cardinal(), record.value());     }     consumer.commitAsync(new OffsetCommitCallback() {         public void onComplete(Map<TopicPartition,         OffsetAndMetadata> offsets, Exception eastward) {             if (e != null)                 log.error("Commit failed for offsets {}", offsets, eastward);         }     });              1              }
1

We send the commit and conduct on, just if the commit fails, the failure and the offsets will be logged.

Retrying Async Commits

A simple blueprint to go commit lodge correct for asynchronous retries is to apply a monotonically increasing sequence number. Increase the sequence number every fourth dimension you commit and add the sequence number at the time of the commit to the commitAsync callback. When y'all're getting prepare to send a retry, cheque if the commit sequence number the callback got is equal to the case variable; if it is, there was no newer commit and it is rubber to retry. If the instance sequence number is higher, don't retry considering a newer commit was already sent.

Combining Synchronous and Asynchronous Commits

Normally, occasional failures to commit without retrying are not a huge problem because if the problem is temporary, the post-obit commit will be successful. Only if we know that this is the last commit before we close the consumer, or before a rebalance, we want to make extra sure that the commit succeeds.

Therefore, a common pattern is to combine commitAsync() with commitSync() but earlier shutdown. Here is how it works (we will discuss how to commit simply before rebalance when we get to the section about rebalance listeners):

try {     while (true) {         ConsumerRecords<String, String> records = consumer.poll(100);         for (ConsumerRecord<String, Cord> record : records) {             System.out.printf("topic = %s, partition = %s, offset = %d,                 client = %s, country = %s\northward",                 record.topic(), tape.segmentation(),                 record.offset(), record.key(), record.value());         }         consumer.commitAsync();              1              } } catch (Exception e) {     log.error("Unexpected mistake", e); } finally {     attempt {         consumer.commitSync();              2              } finally {         consumer.close();     } }
1

While everything is fine, we employ commitAsync. It is faster, and if ane commit fails, the adjacent commit volition serve as a retry.

2

But if we are closing, there is no "next commit." We phone call commitSync(), because information technology will retry until it succeeds or suffers unrecoverable failure.

Commit Specified Offset

Committing the latest start simply allows yous to commit as often equally yous stop processing batches. But what if y'all want to commit more than frequently than that? What if poll() returns a huge batch and you want to commit offsets in the middle of the batch to avoid having to process all those rows again if a rebalance occurs? Yous tin't just phone call commitSync() or commitAsync()—this will commit the last showtime returned, which you didn't become to process yet.

Fortunately, the consumer API allows you lot to phone call commitSync() and commitAsync() and pass a map of partitions and offsets that you wish to commit. If y'all are in the middle of processing a batch of records, and the last message you got from partitioning three in topic "customers" has commencement 5000, yous can phone call commitSync() to commit offset 5001 for partition 3 in topic "customers." Since your consumer may be consuming more than than a single partition, yous volition need to runway offsets on all of them, which adds complexity to your code.

Hither is what a commit of specific offsets looks similar:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =     new HashMap<>();              1              int count = 0;  ....  while (true) {     ConsumerRecords<String, String> records = consumer.poll(100);     for (ConsumerRecord<String, String> record : records) {         Organization.out.printf("topic = %s, partitioning = %s, beginning = %d,             customer = %s, country = %s\n",             record.topic(), tape.division(), record.offset(),             record.key(), record.value());              2              currentOffsets.put(             new TopicPartition(record.topic(), tape.partition()),             new OffsetAndMetadata(tape.commencement()+one, "no metadata"));              3              if (count % m == 0)              4              consumer.commitAsync(currentOffsets, null);              5              count++;     } }
1

This is the map nosotros volition use to manually track offsets.

2

Call back, println is a stand-in for whatever processing you exercise for the records you consume.

3

Afterward reading each record, we update the offsets map with the start of the next message we await to procedure. The committed outset should always exist the offset of the next message that your application will read. This is where we'll start reading next time we start.

4

Here, we decide to commit current offsets every 1,000 records. In your application, you can commit based on time or perhaps content of the records.

5

I chose to call commitAsync(), but commitSync() is also completely valid hither. Of grade, when committing specific offsets you still need to perform all the fault handling nosotros've seen in previous sections.

Rebalance Listeners

As nosotros mentioned in the previous section virtually committing offsets, a consumer will want to do some cleanup work before exiting and also before sectionalization rebalancing.

If you know your consumer is about to lose ownership of a partition, you will desire to commit offsets of the last result you've processed. Mayhap you lot besides need to close file handles, database connections, and such.

The consumer API allows you lot to run your own code when partitions are added or removed from the consumer. You exercise this by passing a ConsumerRebalanceListener when calling the subscribe() method we discussed previously. ConsumerRebalanceListener has two methods you can implement:

public void onPartitionsRevoked(Collection<TopicPartition> partitions)

Called before the rebalancing starts and after the consumer stopped consuming letters. This is where y'all desire to commit offsets, so whoever gets this sectionalization adjacent will know where to commencement.

public void onPartitionsAssigned(Collection<TopicPartition> partitions)

Chosen subsequently partitions have been reassigned to the broker, but earlier the consumer starts consuming messages.

This instance will show how to use onPartitionsRevoked() to commit offsets before losing ownership of a division. In the adjacent department nosotros will show a more involved example that besides demonstrates the use of onPartitionsAssigned():

private Map<TopicPartition, OffsetAndMetadata> currentOffsets =     new HashMap<>();  private form HandleRebalance implements ConsumerRebalanceListener {            1            public void onPartitionsAssigned(Collection<TopicPartition>         partitions) {            2            }      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {         Arrangement.out.println("Lost partitions in rebalance. " +             "Committing current offsets:" + currentOffsets);         consumer.commitSync(currentOffsets);            3            } }  try {     consumer.subscribe(topics, new HandleRebalance());            4            while (true) {         ConsumerRecords<String, String> records = consumer.poll(100);         for (ConsumerRecord<String, String> record : records) {             System.out.printf("topic = %s, partition = %s, commencement = %d,                  client = %s, state = %s\n",                  record.topic(), record.sectionalization(), tape.offset(),                  tape.key(), record.value());              currentOffsets.put(                  new TopicPartition(tape.topic(), record.division()),                  new OffsetAndMetadata(tape.outset()+1, nix));         }         consumer.commitAsync(currentOffsets, null);     } } catch (WakeupException e) {     // ignore, we're endmost } catch (Exception eastward) {     log.error("Unexpected error", due east); } finally {     endeavor {         consumer.commitSync(currentOffsets);     } finally {         consumer.shut();         System.out.println("Closed consumer and we are done");     } }
1

We start by implementing a ConsumerRebalanceListener.

2

In this instance we don't need to do anything when nosotros get a new segmentation; we'll simply first consuming letters.

3

However, when we are virtually to lose a partition due to rebalancing, nosotros need to commit offsets. Note that nosotros are committing the latest offsets we've processed, not the latest offsets in the batch we are still processing. This is because a partition could get revoked while we are still in the middle of a batch. We are committing offsets for all partitions, not just the partitions we are about to lose—because the offsets are for events that were already processed, there is no harm in that. And we are using commitSync() to brand sure the offsets are committed before the rebalance proceeds.

4

The nigh important part: pass the ConsumerRebalanceListener to the subscribe() method and so information technology will get invoked by the consumer.

Consuming Records with Specific Offsets

So far we've seen how to utilise poll() to start consuming messages from the concluding committed offset in each partition and to go on in processing all letters in sequence. However, sometimes you want to start reading at a unlike commencement.

If y'all want to start reading all messages from the beginning of the sectionalisation, or you want to skip all the way to the end of the division and start consuming only new messages, there are APIs specifically for that: seekToBeginning(Drove<TopicPartition> tp) and seekToEnd(Collection<TopicPartition> tp).

Nonetheless, the Kafka API besides lets you seek a specific offset. This ability can be used in a multifariousness of ways; for example, to go back a few letters or skip ahead a few messages (perhaps a time-sensitive application that is falling behind will want to skip alee to more than relevant letters). The near exciting utilise case for this ability is when offsets are stored in a system other than Kafka.

Think nearly this mutual scenario: Your awarding is reading events from Kafka (perhaps a clickstream of users in a website), processes the information (perhaps remove records that indicate clicks from automated programs rather than users), and then stores the results in a database, NoSQL store, or Hadoop. Suppose that we actually don't want to lose any data, nor do we want to store the same results in the database twice.

In these cases, the consumer loop may look a chip similar this:

while (true) {     ConsumerRecords<String, Cord> records = consumer.poll(100);     for (ConsumerRecord<Cord, String> tape : records) {         currentOffsets.put(             new TopicPartition(record.topic(), record.partition()),             record.kickoff());         processRecord(tape);         storeRecordInDB(record);         consumer.commitAsync(currentOffsets);     } }

In this case, we are very paranoid, so we commit offsets afterward processing each record. However, there is still a chance that our application volition crash after the record was stored in the database but before we committed offsets, causing the record to be candy again and the database to contain duplicates.

This could be avoided if there was a mode to shop both the record and the offset in one atomic action. Either both the tape and the offset are committed, or neither of them are committed. As long as the records are written to a database and the offsets to Kafka, this is impossible.

Simply what if nosotros wrote both the record and the offset to the database, in one transaction? And then we'll know that either we are done with the record and the start is committed or nosotros are not and the record will exist reprocessed.

Now the merely problem is if the offset is stored in a database and not in Kafka, how will our consumer know where to start reading when it is assigned a partition? This is exactly what seek() can exist used for. When the consumer starts or when new partitions are assigned, it can wait upwardly the start in the database and seek() to that location.

Hither is a skeleton instance of how this may work. Nosotros apply ConsumerRebalanceLister and seek() to make sure we commencement processing at the offsets stored in the database:

public course SaveOffsetsOnRebalance implements ConsumerRebalanceListener {      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {         commitDBTransaction();            1            }      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {         for(TopicPartition division: partitions)             consumer.seek(partition, getOffsetFromDB(partition));            2            } }   consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); consumer.poll(0);  for (TopicPartition partitioning: consumer.assignment())     consumer.seek(partition, getOffsetFromDB(division));            3            while (true) {     ConsumerRecords<Cord, Cord> records =         consumer.poll(100);     for (ConsumerRecord<String, String> tape : records) {         processRecord(tape);         storeRecordInDB(tape);         storeOffsetInDB(record.topic(), tape.segmentation(),             record.starting time());            4            }     commitDBTransaction(); }
1

Nosotros utilize an imaginary method here to commit the transaction in the database. The idea here is that the database records and offsets volition be inserted to the database every bit we process the records, and we just need to commit the transactions when we are most to lose the division to make certain this information is persisted.

2

We besides have an imaginary method to fetch the offsets from the database, and and then we seek() to those records when we go ownership of new partitions.

3

When the consumer showtime starts, later nosotros subscribe to topics, we phone call poll() once to make sure we join a consumer group and get assigned partitions, and so nosotros immediately seek() to the correct starting time in the partitions we are assigned to. Keep in heed that seek() simply updates the position we are consuming from, so the adjacent poll() volition fetch the right messages. If at that place was an error in seek() (eastward.k., the offset does not exist), the exception volition be thrown by poll().

4

Another imaginary method: this fourth dimension we update a table storing the offsets in our database. Hither we presume that updating records is fast, so we do an update on every record, but commits are dull, and then we simply commit at the stop of the batch. However, this can be optimized in different ways.

At that place are many different ways to implement exactly-once semantics by storing offsets and data in an external store, merely all of them will need to use the ConsumerRebalanceListener and seek() to make certain offsets are stored in time and that the consumer starts reading messages from the correct location.

Merely How Do We Exit?

Earlier in this chapter, when we discussed the poll loop, I told you non to worry about the fact that the consumer polls in an infinite loop and that nosotros would discuss how to exit the loop cleanly. And so, let's discuss how to leave cleanly.

When yous decide to get out the poll loop, you will need another thread to call consumer.wakeup() . If you are running the consumer loop in the main thread, this can be washed from ShutdownHook. Notation that consumer.wakeup() is the only consumer method that is safe to telephone call from a different thread. Calling wakeup volition cause poll() to go out with WakeupException, or if consumer.wakeup() was called while the thread was not waiting on poll, the exception will be thrown on the next iteration when poll() is called. The WakeupException doesn't need to be handled, merely before exiting the thread, you must telephone call consumer.close(). Closing the consumer volition commit offsets if needed and will send the group coordinator a message that the consumer is leaving the group. The consumer coordinator will trigger rebalancing immediately and you won't demand to wait for the session to time out before partitions from the consumer y'all are endmost will be assigned to another consumer in the group.

Here is what the exit code will look like if the consumer is running in the main application thread. This example is a scrap truncated, simply you can view the full example at http://bit.ly/2u47e9A.

Runtime.getRuntime().addShutdownHook(new Thread() {     public void run() {         System.out.println("Starting go out...");         consumer.wakeup();            1            try {             mainThread.join();         } catch (InterruptedException e) {             e.printStackTrace();         }     } });  ...  try {     // looping until ctrl-c, the shutdown hook will cleanup on go out     while (true) {         ConsumerRecords<String, String> records =             movingAvg.consumer.poll(1000);         System.out.println(System.currentTimeMillis() +             "--  waiting for information...");         for (ConsumerRecord<Cord, String> record : records) {             Arrangement.out.printf("offset = %d, key = %s, value = %s\northward",                 record.offset(), record.primal(), record.value());         }         for (TopicPartition tp: consumer.assignment())             Arrangement.out.println("Committing outset at position:" +                 consumer.position(tp));             movingAvg.consumer.commitSync();     } } catch (WakeupException e) {     // ignore for shutdown            2            } finally {     consumer.close();            3            System.out.println("Closed consumer and nosotros are done"); }
1

ShutdownHook runs in a separate thread, so the but safe activeness we can have is to call wakeup to break out of the poll loop.

2

Some other thread calling wakeup will cause poll to throw a WakeupException. You lot'll want to catch the exception to make sure your application doesn't exit unexpectedly, only there is no need to do annihilation with it.

3

Before exiting the consumer, make sure you close it cleanly.

Deserializers

Equally discussed in the previous chapter, Kafka producers crave serializers to convert objects into byte arrays that are then sent to Kafka. Similarly, Kafka consumers crave deserializers to catechumen byte arrays received from Kafka into Java objects. In previous examples, nosotros just causeless that both the primal and the value of each message are strings and we used the default StringDeserializer in the consumer configuration.

In Affiliate 3 about the Kafka producer, we saw how to serialize custom types and how to employ Avro and AvroSerializers to generate Avro objects from schema definitions and then serialize them when producing messages to Kafka. Nosotros volition at present look at how to create custom deserializers for your own objects and how to use Avro and its deserializers.

Information technology should exist obvious that the serializer used to produce events to Kafka must match the deserializer that volition be used when consuming events. Serializing with IntSerializer and so deserializing with StringDeserializer volition not cease well. This means that as a programmer yous need to keep runway of which serializers were used to write into each topic, and make sure each topic merely contains data that the deserializers you use can translate. This is one of the benefits of using Avro and the Schema Repository for serializing and deserializing—the AvroSerializer can brand sure that all the data written to a specific topic is compatible with the schema of the topic, which means information technology can be deserialized with the matching deserializer and schema. Any errors in compatibility—on the producer or the consumer side—will be defenseless easily with an appropriate error message, which means you will not need to effort to debug byte arrays for serialization errors.

We will starting time by speedily showing how to write a custom deserializer, fifty-fifty though this is the less common method, and then we volition move on to an example of how to use Avro to deserialize message keys and values.

Custom deserializers

Let's take the same custom object we serialized in Chapter 3, and write a deserializer for it:

public course Customer {     private int customerID;     private String customerName;      public Customer(int ID, String proper noun) {         this.customerID = ID;         this.customerName = name;     }      public int getID() {         return customerID;     }      public String getName() {         return customerName;     } }

The custom deserializer will await equally follows:

import org.apache.kafka.mutual.errors.SerializationException;  import java.nio.ByteBuffer; import java.util.Map;  public class CustomerDeserializer implements Deserializer<Customer> {              1              @Override     public void configure(Map configs, boolean isKey) {         // nothing to configure     }      @Override     public Client deserialize(Cord topic, byte[] data) {         int id;         int nameSize;         String name;          effort {             if (data == goose egg)                 return zero;             if (data.length < 16)                 throw new SerializationException("Size of data received " +                     "by deserializer is shorter than expected");              ByteBuffer buffer = ByteBuffer.wrap(data);             id = buffer.getInt();             nameSize = buffer.getInt();              byte[] nameBytes = new byte[nameSize];             buffer.get(nameBytes);             name = new String(nameBytes, "UTF-8");              return new Customer(id, name);              2              } catch (Exception east) {   	    throw new SerializationException("Error when deserializing " +   	        "byte[] to Client " + e);         }     }      @Override     public void shut() {         // nothing to close     } }
1

The consumer as well needs the implementation of the Client class, and both the form and the serializer need to match on the producing and consuming applications. In a large organisation with many consumers and producers sharing access to the information, this can go challenging.

2

We are simply reversing the logic of the serializer hither—nosotros get the customer ID and name out of the byte assortment and use them to construct the object we demand.

The consumer code that uses this serializer will look similar to this instance:

Backdrop props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("fundamental.deserializer",     "org.apache.kafka.mutual.serialization.StringDeserializer"); props.put("value.deserializer",     CustomerDeserializer.class.getName());  KafkaConsumer<String, Customer> consumer =     new KafkaConsumer<>(props);  consumer.subscribe(Collections.singletonList("customerCountries"))  while (true) {     ConsumerRecords<Cord, Client> records = consumer.poll(100);     for (ConsumerRecord<String, Customer> tape : records) {         Arrangement.out.println("current client Id: " +             record.value().getID() + " and             electric current client name: " +  record.value().getName());     }     consumer.commitSync(); }

Once more, it is of import to note that implementing a custom serializer and deserializer is non recommended. It tightly couples producers and consumers and is fragile and fault-prone. A amend solution would be to use a standard message format such as JSON, Thrift, Protobuf, or Avro. We'll now run into how to employ Avro deserializers with the Kafka consumer. For background on Apache Avro, its schemas, and schema-compatibility capabilities, refer dorsum to Chapter 3.

Using Avro deserialization with Kafka consumer

Allow's presume nosotros are using the implementation of the Client course in Avro that was shown in Chapter 3. In gild to eat those objects from Kafka, you want to implement a consuming application similar to this:

Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("central.deserializer",     "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer",     "io.confluent.kafka.serializers.KafkaAvroDeserializer");              1              props.put("specific.avro.reader","truthful"); props.put("schema.registry.url", schemaUrl);              2              String topic = "customerContacts"  KafkaConsumer<Cord, Customer> consumer =     new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topic));  System.out.println("Reading topic:" + topic);  while (true) {     ConsumerRecords<String, Customer> records = consumer.poll(k);              3              for (ConsumerRecord<Cord, Customer> tape: records) {         System.out.println("Current customer proper noun is: " +             record.value().getName());              4              }     consumer.commitSync(); }
1

We utilise KafkaAvroDeserializer to deserialize the Avro messages.

2

schema.registry.url is a new parameter. This but points to where we shop the schemas. This way the consumer tin apply the schema that was registered by the producer to deserialize the message.

3

Nosotros specify the generated class, Client, every bit the type for the record value.

4

record.value() is a Customer instance and we can use it accordingly.

Standalone Consumer: Why and How to Employ a Consumer Without a Group

Then far, we have discussed consumer groups, which are where partitions are assigned automatically to consumers and are rebalanced automatically when consumers are added or removed from the grouping. Typically, this behavior is just what you lot desire, but in some cases you desire something much simpler. Sometimes you lot know you accept a single consumer that ever needs to read information from all the partitions in a topic, or from a specific division in a topic. In this case, there is no reason for groups or rebalances—just assign the consumer-specific topic and/or partitions, eat messages, and commit offsets on occasion.

When you know exactly which partitions the consumer should read, you don't subscribe to a topic—instead, you assign yourself a few partitions. A consumer tin either subscribe to topics (and be role of a consumer grouping), or assign itself partitions, simply not both at the same time.

Hither is an example of how a consumer can assign itself all partitions of a specific topic and consume from them:

List<PartitionInfo> partitionInfos = nil; partitionInfos = consumer.partitionsFor("topic");            1            if (partitionInfos != nada) {     for (PartitionInfo partition : partitionInfos)         partitions.add together(new TopicPartition(partition.topic(),             division.partitioning()));     consumer.assign(partitions);            2            while (true) {         ConsumerRecords<String, String> records = consumer.poll(1000);          for (ConsumerRecord<String, String> record: records) {             System.out.printf("topic = %southward, partition = %s, offset = %d,                 client = %s, country = %due south\n",                 record.topic(), tape.sectionalization(), record.beginning(),                 record.primal(), record.value());         }         consumer.commitSync();     } }
1

We start by asking the cluster for the partitions available in the topic. If yous simply programme on consuming a specific partition, you can skip this function.

2

Once nosotros know which partitions we want, we call assign() with the list.

Other than the lack of rebalances and the need to manually find the partitions, everything else is business organisation as usual. Keep in mind that if someone adds new partitions to the topic, the consumer will not be notified. You volition need to handle this by checking consumer.partitionsFor() periodically or simply past billowy the application whenever partitions are added.

Older Consumer APIs

In this chapter nosotros discussed the Java KafkaConsumer client that is office of the org.apache.kafka.clients packet. At the time of writing, Apache Kafka still has two older clients written in Scala that are function of the kafka.consumer package, which is part of the core Kafka module. These consumers are chosen SimpleConsumer (which is not very elementary). SimpleConsumer is a thin wrapper around the Kafka APIs that allows you to consume from specific partitions and offsets. The other erstwhile API is called high-level consumer or ZookeeperConsumerConnector. The high-level consumer is somewhat similar to the current consumer in that it has consumer groups and it rebalances partitions, merely it uses Zookeeper to manage consumer groups and does not give y'all the same command over commits and rebalances equally we have now.

Because the current consumer supports both behaviors and provides much more reliability and control to the developer, we will not hash out the older APIs. If yous are interested in using them, please think twice and then refer to Apache Kafka documentation to learn more than.

Summary

Nosotros started this chapter with an in-depth explanation of Kafka's consumer groups and the fashion they allow multiple consumers to share the work of reading events from topics. We followed the theoretical give-and-take with a applied instance of a consumer subscribing to a topic and continuously reading events. We then looked into the well-nigh important consumer configuration parameters and how they affect consumer beliefs. Nosotros dedicated a big office of the chapter to discussing offsets and how consumers proceed runway of them. Understanding how consumers commit offsets is critical when writing reliable consumers, so we took time to explain the different ways this tin can be done. We then discussed additional parts of the consumer APIs, handling rebalances and closing the consumer.

We ended by discussing the deserializers used past consumers to turn bytes stored in Kafka into Java objects that the applications tin procedure. Nosotros discussed Avro deserializers in some item, even though they are just one type of deserializer y'all tin use, because these are most commonly used with Kafka.

Now that you know how to produce and swallow events with Kafka, the next chapter explains some of the internals of a Kafka implementation.

ketroncuraidondial49.blogspot.com

Source: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

0 Response to "Kafka Model and View to Read Data From Topic"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel