kafka怎么获得最后一条消息的offset

如题所述

第1个回答  推荐于2016-10-25
我不生产答案,我只是当一回Stackoverflow的搬运工。今天刚好在Stackoverflow查Kafka的一个问题,顺带看到的。
For finding the start offset to read in Kafka 0.8 Simple Consumer example they say
Kafka includes two constants to help,
kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data
in the logs and starts streaming from there,
kafka.api.OffsetRequest.LatestTime() will only stream new messages.
You can also find the example code there for managing the offset at your consumer end.
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}本回答被提问者和网友采纳
相似回答