code
执行hadoop-2.2.0/bin/hadoop balancer -h
查看可以设置的参数(和sbin/start-balancer.sh一样)。
1
2
3
Usage: java Balancer
[-policy <policy>] the balancing policy: datanode or blockpool
[-threshold <threshold>] Percentage of disk capacity
main方法入口,可以接受threshold(大于等于1小于等于100, 默认值10)和policy(可取datanode[dfsused]/blockpool[
blockpoolused], 默认值datanode),具体的含义可以查看(上)篇中的javadoc的描述。
获取初始化参数
然后通过ToolRunner解析参数,并运行Cli工具类来执行HDFS的平衡。
1 设置检查
WIN_WIDTH
(默认1.5h) 已移动的数据会记录movedBlocks(list)变量中,在移动成功的数据CUR_WIN
的值经过该时间后会被移动到OLD_WIN
—现在感觉作用不大,为了减少map的大小?
checkReplicationPolicyCompatibility()
检查配置dfs.block.replicator.classname
是否为BlockPlacementPolicyDefault子类,即是否满足3份备份的策略(1st本地,2nd另一个rack,3rd和第二份拷贝不同rack的节点)?
2 获取nameserviceuris
通过DFSUtil#getNsServiceRpcUris()
来获取namenodes,调用getNameServiceUris()
来得到一个URI的结果集:
1
2
3
4
5
6
+ nsId <- dfs.nameservices
? ha <- dfs.namenode.rpc-address + [dfs.nameservices] + [dfs.ha.namenodes]
Y+ => hdfs://nsId
N+ => hdfs://[dfs.namenode.servicerpc-address.[nsId]] 或 hdfs://[dfs.namenode.rpc-address.[nsId]] 第二个满足条件的加入到nonPreferredUris
+ hdfs://[dfs.namenode.servicerpc-address] 或 hdfs://[dfs.namenode.rpc-address] 第二个满足条件的加入到nonPreferredUris
? [fs.defaultFs] 以hfds开头,且不在nonPreferredUris集合中是加入结果集
HA情况下的地址相关配置项可以查看官网的文档
1
2
3
dfs.nameservices
dfs.ha.namenodes.[nameservice ID]
dfs.namenode.rpc-address.[nameservice ID].[name node ID]
3 解析threshold和policy参数
默认值: BalancingPolicy.Node.INSTANCE, 10.0 。运行打印的日志如下,INFO日志中包括了初始化的参数信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
2014-09-05 10:55:12,183 INFO Balancer: Using a threshold of 1.0
2014-09-05 10:55:12,186 INFO Balancer: namenodes = [hdfs://umcc97-44:9000]
2014-09-05 10:55:12,186 INFO Balancer: p = Balancer.Parameters[BalancingPolicy.Node, threshold=1.0]
2014-09-05 10:55:13,744 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2014-09-05 10:55:18,154 INFO net.NetworkTopology: Adding a new node: /default-rack/10.18.97.142:50010
2014-09-05 10:55:18,249 INFO net.NetworkTopology: Adding a new node: /default-rack/10.18.97.144:50010
2014-09-05 10:55:18,311 INFO net.NetworkTopology: Adding a new node: /default-rack/10.18.97.143:50010
2014-09-05 10:55:18,319 INFO Balancer: 2 over-utilized: [Source[10.18.97.144:50010, utilization=8.288283273062705], Source[10.18.97.143:50010, utilization=8.302032354001554]]
2014-09-05 10:55:18,320 INFO Balancer: 1 underutilized: [BalancerDatanode[10.18.97.142:50010, utilization=4.716543864576553]]
2014-09-05 10:55:33,918 INFO Balancer: Need to move 3.86 GB to make the cluster balanced.
2014-09-05 11:21:16,875 INFO Balancer: Decided to move 2.43 GB bytes from 10.18.97.144:50010 to 10.18.97.142:50010
2014-09-05 11:24:16,712 INFO Balancer: Decided to move 1.84 GB bytes from 10.18.97.143:50010 to 10.18.97.142:50010
2014-09-05 11:25:55,726 INFO Balancer: Will move 4.27 GB in this iteration
执行Balancer
4 调用Balancer#run执行
1
2
3
# 调试命令
export HADOOP_OPTS=" -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8087 "
sbin/start-balancer.sh
Balancer的静态方法run,循环处理所有namenodes。在实例化namenode的NameNodeConnector对象时,会把当前运行balancer程序的hostname写入到HDFS的/system/balancer.id
文件中,可以用来控制同时只有一个balancer运行。
在循环处理的时刻使用Collections.shuffle(connectors)
打乱了namenodes的顺序。
Balancer的静态方法run中是一个双层循环,实例化Balancer并调用实例方法run来处理每个namenode的平衡。运行后要么出错 要么就是平衡顺利完成 才算结束。平衡的返回状态值及其含义可以查看javadoc(上)篇。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static int run(Collection<URI> namenodes, final Parameters p,
Configuration conf) throws IOException, InterruptedException {
...
for (URI uri : namenodes) {
connectors.add(new NameNodeConnector(uri, conf));
}
boolean done = false;
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
final ReturnStatus r = b.run(iteration, formatter, conf);
// clean all lists
b.resetData(conf);
if (r == ReturnStatus.IN_PROGRESS) {
done = false;
} else if (r != ReturnStatus.SUCCESS) {
//must be an error statue, return.
return r.code;
}
}
if (!done) {
Thread.sleep(sleeptime);
}
}
...
}
5 针对每个namenode的平衡处理
针对每个namenode的每次迭代,又可以分出初始化节点、选择移动节点、移动数据三个部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private ReturnStatus run(int iteration, Formatter formatter, Configuration conf) {
...
final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting...");
return ReturnStatus.SUCCESS;
}
final long bytesToMove = chooseNodes();
if (bytesToMove == 0) {
System.out.println("No block can be moved. Exiting...");
return ReturnStatus.NO_MOVE_BLOCK;
}
if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
return ReturnStatus.NO_MOVE_PROGRESS;
}
return ReturnStatus.IN_PROGRESS;
...
}
获取集群Live Datanode节点的信息,和通过50070查看的信息差不多,然后调用initNode()方法。
5.1 初始化节点
initNodes()
中获取每个Datanode的capacity和dfsUsed数据,计算整个集群dfs的平均使用率avgUtilization。
然后根据每个节点的使用率与集群使用率,以及阀值进行比较划分为4种情况:
overUtilizedDatanodes
,aboveAvgUtilizedDatanodes
,belowAvgUtilizedDatanodes
,underUtilizedDatanodes
。
同时取超出平均+阀值 和低于平均-阀值 的字节数最大值,即集群达到平衡需要移动的字节数。
为了测试,如果集群已经平衡,可以搞点数据让集群不平衡,方便查看调试。
1
2
3
bin/hadoop fs -D dfs.replication=1 -put XXXXX /abc
sbin/start-balancer.sh -threshold 1
5.2 选择节点
初始化节点后,计算出了需要移动的数据量。接下来就是选择移动数据的节点chooseNodes
,以及接收对应数据的节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private long chooseNodes() {
// First, match nodes on the same node group if cluster is node group aware
if (cluster.isNodeGroupAware()) {
chooseNodes(SAME_NODE_GROUP);
}
chooseNodes(SAME_RACK);
chooseNodes(ANY_OTHER);
long bytesToMove = 0L;
for (Source src : sources) {
bytesToMove += src.scheduledSize;
}
return bytesToMove;
}
private void chooseNodes(final Matcher matcher) {
chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
}
private <D extends BalancerDatanode, C extends BalancerDatanode> void
chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
Matcher matcher) {
for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
final D datanode = i.next();
for(; chooseForOneDatanode(datanode, candidates, matcher); );
if (!datanode.hasSpaceForScheduling()) {
i.remove(); // “超出”部分全部有去处了
}
}
}
private <C extends BalancerDatanode> boolean chooseForOneDatanode(
BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
final Iterator<C> i = candidates.iterator();
final C chosen = chooseCandidate(dn, i, matcher);
if (chosen == null) {
return false;
}
if (dn instanceof Source) {
matchSourceWithTargetToMove((Source)dn, chosen);
} else {
matchSourceWithTargetToMove((Source)chosen, dn);
}
if (!chosen.hasSpaceForScheduling()) {
i.remove(); // 可用的空间已经全部分配出去了
}
return true;
}
private <D extends BalancerDatanode, C extends BalancerDatanode>
C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
if (dn.hasSpaceForScheduling()) {
for(; candidates.hasNext(); ) {
final C c = candidates.next();
if (!c.hasSpaceForScheduling()) {
candidates.remove();
} else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
return c;
}
}
}
return null;
}
选择到接收节点 后,接下来计算可以移动的数据量(取双方的available的最大值),然后把接收节点 和数据量 的信息NodeTask存储到Source的NodeTasks对象中。
1
2
3
4
5
6
7
8
9
private void matchSourceWithTargetToMove(
Source source, BalancerDatanode target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
NodeTask nodeTask = new NodeTask(target, size);
source.addNodeTask(nodeTask);
target.incScheduledSize(nodeTask.getSize());
sources.add(source);
targets.add(target);
}
5.3 移动数据
(待)
–END