org/apache/kafka/clients/consumer/KafkaConsumer.java
private void updateFetchPositions(Set<TopicPartition> partitions) {
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
Kafka-0.10.0.1
12345678
org.apache.kafka.clients.consumer.KafkaConsumer#updateFetchPositions
private void updateFetchPositions(Set<TopicPartition> partitions) {
// refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
问题描述以及说明
当订阅同一个主题的多个分区时,每次SparkStreaming会获取每次处理的Offset。
1234567891011121314151617181920212223242526
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#latestOffsets
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
val parts = c.assignment().asScala
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// don't want to consume messages, so pause
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val rdd = new KafkaRDD[K, V](
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
...
org.apache.kafka.clients.consumer.KafkaConsumer#position
public long position(TopicPartition partition) {
acquire();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPositions(Collections.singleton(partition));
offset = this.subscriptions.position(partition);
}
return offset;
} finally {
release();
}
}
private void updateFetchPositions(Set<TopicPartition> partitions) {
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
org.apache.kafka.clients.consumer.internals.Fetcher#resetOffsetsIfNeeded
public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
resetOffset(tp);
}
}
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#seek
private void seek(long offset) {
this.position = offset;
this.resetStrategy = null;
}