前面讲到了节点的初始化,根据节点使用率与集群dfs使用率比较分为
overUtilizedDatanodes
,aboveAvgUtilizedDatanodes
,belowAvgUtilizedDatanodes
,underUtilizedDatanodes
,同时进行了节点数据量从Source到Target的配对。
接下来就是最后的数据移动部分了。
5.3 移动数据
1 2 3 4 5 6 7 8 |
|
针对一个namenode如果连续5次没有移动数据,就会退出平衡操作,是在NameNodeConnector#shouldContinue(long)
中处理的。
由于这里需要进行大量计算,以及耗时的文件传输等操作,这里使用了executorservice,分别为moverExecutor和dispatcherExecutor,有两个配置dfs.balancer.moverThreads
(1000)和dfs.balancer.dispatcherThreads
(200)来设置线程池的大小。
1 2 3 4 5 6 7 8 9 |
|
其中dispatchBlockMoves()
包装了数据移动的操作,把source的块移动到target节点中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
|
上面是分发功能主程序执行的代码,调用分发线程和等待执行结果的代码。主要业务逻辑在线程中调用执行。
分发线程dispatcher先获取Source上指定大小的block块,对应到getBlockList()
方法。除了用于块同步的globalBlockList变量、以及记录当前Source获取的srcBlockList、最重要的当属用于判断获取的块是否符合条件的方法isGoodBlockCandidate(block)
。在移动块的选择也会用到该方法,单独拿出来在后面讲。
然后选择Source下哪些块将移动到Targets目标节点。在chooseNodes
步骤中把移动和接收数据的流向确定了,相关信息存储在Source的nodeTasks列表对象中。这里PendingBlockMove.chooseBlockAndProxy()
把Sources需要移动的块确定下来,把从Source获取到的srcBlockList分配给Target。然后交给moverExecutor去执行。
其中通过isGoodBlockCandidate
和chooseProxySource
(选择从那个目标获取block的真实数据,不一定是Source节点哦!)方法筛选合适的符合条件的块,并加入到movedBlocks对象。
调用的dispatchBlocks方法第一次循环是不会有数据移动的,此时Source对象中srcBlockList可移动块为空,从Source中获取块后再进行块的移动操作chooseNextBlockToMove()
。
先讲下Source类属性blocksToReceive,初始值为2*scheduledSize,有三个地方:dispatchBlocks初始化大小,getBlockList从Source节点获取block的量同时减去获取到的block的字节数,还有就是shouldFetchMoreBlocks用于判断是否还有数据需要获取或者移动dispatchBlocks。这个属性其实也就是设置一个阀,不管block是否为最终移动的block,获取到块的信息后就会从blocksToReceive减去相应的字节数。
前面获取Source block和分配到Target block都使用了isGoodBlockCandidate方法,这里涉及到怎么去评估块获取和分配是否合理的问题。需同时满足下面三个条件:
- 当前选中的移动的块,不在已移动块的名单中
movedBlocks.contains
- 移动的块在目的机器上没有备份
- 移动的块不减少含有该数据的机架数量
- 多机架的情况下
cluster.isNodeGroupAware()
,移动的块在目的机器的机架上没有备份 - YES source和target在同一个机架上。
- YES source和target不在同一机架上,且该块没有一个备份在target的机架上
- YES source和target不在同一机架上,且该块有另一个备份和source在同一机架上
- 多机架的情况下
疑问
一个Datanode只能同时移动/接收5个Block(即MAX_NUM_CONCURRENT_MOVES值),结合chooseProxySource
的代码的addTo调用,看的很是辛苦!如block-A所有块都在A机架上,在选择proxySource时,会把该块的两个datanode都加上一个pendingBlock,显然这不大合理!!
如果备用的proxySource节点恰好还是target的话,waitForMoveCompletion方法永远不能结束!!应该把没有找到同机架的源情况移到for循环外面进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
不过无需庸人自扰,一般都在一个rack上,这种问题就不存在了!同时这个也不是能一步到位,加了很多限制(一次迭代一个datanode最多处理10G,获取一次srcBlockList仅2G还限制就一次迭代就5个block),会执行很多次。
总结
总体的代码大致就是这样子了。根据集群使用率和阀值,计算需要进行数据接收和移动的节点(初始化),然后进行配对(选择),再进行块的选取和接收节点进行配对(分发),最后就是数据的移动(理解为socket数据传递就好了,调用了HDFS的协议代码。表示看不明),并等待该轮操作结束。
举例
除了指定threshold为5,其他是默认参数。由于仅单namenode和单rack,所以直接分析第五部分的namenode平衡处理。
根据所给的数据,(initNodes)第一步计算使用率,得出需要移动的数据量,把datanodes对号入座到over/above/below/under四个分类中。
(chooseNodes)第二步进行Source到Target节点的计划移动数据量计算。
在初始化BalancerDatanode的时刻,就计算出了节点的maxSize2Move。从给出的数据,只有一个节点超过阀值,另外两个是都在阀值内,一个高于平均值一个低于平均值。
这里就是把A1超出部分的数据(小于10G)移到A2,计算Source和Target的scheduledSize的大小。
1 2 3 4 5 |
|
(dispatchBlockMoves)第三步就是分发进行块的转移。
先设置blocksToReceive(2*scheduledSize=16857142.86)
1 2 3 4 5 6 7 8 9 |
|
从Source获取块时,可能在A2上已经有了,会通过isGoodBlockCandidate来进行过滤。然后就是把它交给moverExecutor执行数据块的移动,完成后修改处理的数据量byteMoved,把移动的块从target和proxySource的pendingBlockList中删除。
重复进行以上步骤,直到全部所有节点的使用率都在阀值内,顺利结束本次平衡处理。
–END