我需要在一天内每小时收到卡夫卡的消息.每隔一小时我就会开始一份工作来消费1小时前制作的消息.例如,如果当前时间是20:12,我将在19:00:00和19:59:59之间消费该消息.这意味着我需要在时间19:00:00获得开始偏移,并在时间19:59:59之前结束偏移.我使用了SimpleConsumer.getOffsetsBefore,如" 0.8.0 SimpleConsumer Example "中所示.问题是返回的偏移量与作为参数给出的时间戳不匹配.例如,当时间戳为19:00:00时,我收到时间16:38:00产生的消息.
getOffsetsByTimes()
可以使用以下kafka consumer api方法,它可以从0.10.0或更高版本获得.请参阅JavaDoc.
/** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. * * This is a blocking call. The consumer does not have to be assigned the partitions. * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null * will be returned for that partition. * * Notice that this method may block indefinitely if the partition does not exist. * * @param timestampsToSearch the mapping from partition to the timestamp to look up. * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. */ @Override public MapoffsetsForTimes(Map timestampsToSearch) { for (Map.Entry entry : timestampsToSearch.entrySet()) { // we explicitly exclude the earliest and latest offset here so the timestamp in the returned // OffsetAndTimestamp is always positive. if (entry.getValue() < 0) throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative."); } return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs); }
正如其他回复所指出的那样,旧版本的Kafka只有一种将时间映射到偏移的近似方法.但是,自从Kafka 0.10.0(2016年5月发布)以来,Kafka为每个主题保留了时间索引.这将使您有效地从时间到精确的偏移.您可以使用KafkaConsumer #offsetsForTimes方法来访问此信息.
有关如何在KIP-33设计讨论页面上实现基于时间的索引的更多详细信息.
在Kafka中,目前无法获得与特定时间戳相对应的偏移量 - 这是设计使然.如Jay Kreps的Log Article顶部所述,偏移数为日志提供了一种与挂钟时间分离的时间戳.将偏移量作为您的时间概念,您可以知道任何两个系统是否处于一致状态,只需购买知道他们读取的偏移量.对于不同服务器上的不同时钟时间,闰年,日光节省时间,时区等,从来没有任何混淆.它有点不错......
现在......所有这一切,如果你知道你的服务器在某个时间X下降,那么实际上,你真的想知道相应的偏移量.你可以近距离接触.kafka机器上的日志文件是根据它们开始编写的时间命名的,并且存在一个kafka工具(我现在找不到),让您知道哪些偏移与这些文件相关联.如果您想知道确切的时间戳,那么您必须对您发送给Kafka的消息中的时间戳进行编码.