如何手动更新Kafka中某个Topic的偏移量

如题所述

第1个回答  2016-06-29
我不生产答案,我只是当一回Stackoverflow的搬运工。今天刚好在Stackoverflow查Kafka的一个问题,顺带看到的。 For finding the start offset to read in Kafka 0.8 Simple Consumer example they say Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages. You can also find the example code there for managing the offset at your consumer end. public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) ); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; }

kafka查询和修改topic的offset
手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:在不输入参数的情况下,我们可以得...

如何手动更新Kafka中某个Topic的偏移量
手动更新Kafka中某个Topic的偏移量的方法 若没有分区,一个topic对应的消息集在分布式集群服务组中,就会分布不均匀,即可能导致某台服务器A记录当前topic的消息集很多,若此topic的消息压力很大的情况下,服务器A就可能导致压力很大,吞吐也容易导致瓶颈。有了分区后,假设一个topic可能分为10个分区,kafk...

如何获取kafka某一topic中最新的offset
如果你在0.9版本以上,可以用最新的Consumer client 客户端,有consumer.seekToEnd() \/ consumer.position() 可以用于得到当前最新的offset:{log.dirs}\/replication-offset-checkpoint

华为kafka安全版重置group中的topic offset不生效问题
方式二使用的是消费者消费数据时提交的offset信息,一般来说只要topic可以正常被消费,方式二都能生效。

如何获取kafka某一topic中最新的offset
选中某一行,var record = grid.getSelectionModel().getSelection();一行的所有数据都在record里面 具体某一列:record.get("列名-dataIndex")

【kafka-开发】修改topic分区及副本数
去掉log_dirs字段并修改replicas字段([0,1,2]分别为broker_id),表示副本存储的broker位置,如下:重新分配及查看分配进度

如何重新读取kafka集群topic
Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的 轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录 Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行rela...

【kafka原理】 消费者偏移量__consumer_offsets_相关解析
consume_group`确定分区数,例如对于`szz1-group`,通过哈希求模运算确定消费组的偏移量信息存于哪个分区。可以通过命令查询,其结构为键和值,键由消费组+Topic+分区数确定,值包含了消费组的偏移量信息。日常运维与问题排查方面,滴滴开源的LogiKM一站式Kafka监控与管控平台能够提供高效的支持。

Kafka的Topic配置详解
(A)创建topic时配置参数 (B)修改topic时配置参数 覆盖已经有topic参数,下面例子修改"my-topic"的max message属性 (C)删除topic级别配置参数 注:配置的kafka集群的根目录为\/config\/mobile\/mq\/mafka02,因此所有节点信息都在此目录下。cleanup.policy delete.retention.ms delete.retention.ms flush....

如何为一个kafka集群选择topics\/partitions的数量
如何决定kafka集群中topic,partition的数量,这是许多kafka用户经常遇到的问题。本文列举阐述几个重要的决定因素,以提供一些参考。分区多吞吐量更高 一个话题topic的各个分区partiton之间是并行的。在producer和broker方面,写不同的分区是完全并行的。因此一些昂贵的操作比如压缩,可以获得更多的资源,因为有...

相似回答