Article
spark2.0 + kafka0.10.1订阅多个但只读了一个分区
同事在使用Spark-Kafka-Streaming的时刻遇到只能读取一个分区的情况,最后他找到问题所在。这里记录下,说白了就是Spark-2.0.0默认是用Kafka-0.10.0.1,自己换程序版本有风险!
# 问题的关键点
- Kafka-0.10.1.0
org/apache/kafka/clients/consumer/KafkaConsumer.java
private void updateFetchPositions(Set<TopicPartition> partitions) {
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
- Kafka-0.10.0.1
org.apache.kafka.clients.consumer.KafkaConsumer#updateFetchPositions
private void updateFetchPositions(Set<TopicPartition> partitions) {
// refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
# 问题描述以及说明
当订阅同一个主题的多个分区时,每次SparkStreaming会获取每次处理的Offset。
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#latestOffsets
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
val parts = c.assignment().asScala
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// don't want to consume messages, so pause
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val rdd = new KafkaRDD[K, V](
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
...
如果使用kafka-0.10.1.0时,seekToEnd会重置当前客户端分区实例的position为null。
org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd
public void seekToEnd(Collection<TopicPartition> partitions) {
acquire();
try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
}
} finally {
release();
}
}
org.apache.kafka.clients.consumer.internals.SubscriptionState#needOffsetReset(TopicPartition, OffsetResetStrategy)
public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
assignedState(partition).awaitReset(offsetResetStrategy);
}
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#awaitReset
private void awaitReset(OffsetResetStrategy strategy) {
this.resetStrategy = strategy;
this.position = null;
}
此时再调用position一个个分区的获取最新位置信息。
org.apache.kafka.clients.consumer.KafkaConsumer#position
public long position(TopicPartition partition) {
acquire();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPositions(Collections.singleton(partition));
offset = this.subscriptions.position(partition);
}
return offset;
} finally {
release();
}
}
private void updateFetchPositions(Set<TopicPartition> partitions) {
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
org.apache.kafka.clients.consumer.internals.Fetcher#resetOffsetsIfNeeded
public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
resetOffset(tp);
}
}
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#seek
private void seek(long offset) {
this.position = offset;
this.resetStrategy = null;
}
新版本KafkaConsumer先更新位置,最终调用seek设置position以及重置resetStrategy。
但是后面又额外多了一个判断!!检测所有的分区,只要有一个有问题就重新获取position,最对有问题啊!尽管后面又调用updateFetchPositions但是环境已经变了啊!!导致多个分区的情况下只能读取一个分区的数据。
问题找到了,直接客户端用旧的就行了。
–END
Related
Related posts
-
杀鸡焉用牛刀:DuckDB 正取代部分 Spark 场景
2026-02-16
-
基于对象存储的 Spark 数据读写实战:从末尾追加到任意更新
2025-10-28
-
认真的博客
2021-12-08
-
视频自动翻译
2018-08-25