Winse Blog

走走停停都是风景, 熙熙攘攘都向最好, 忙忙碌碌都为明朝, 何畏之.

[读码] Hadoop2 Balancer磁盘空间平衡(下)

前面讲到了节点的初始化,根据节点使用率与集群dfs使用率比较分为 overUtilizedDatanodesaboveAvgUtilizedDatanodesbelowAvgUtilizedDatanodesunderUtilizedDatanodes,同时进行了节点数据量从Source到Target的配对。

接下来就是最后的数据移动部分了。

5.3 移动数据

1
2
3
4
5
6
7
8
  private ReturnStatus run(int iteration, Formatter formatter,
      Configuration conf) {
      ...
      if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
        return ReturnStatus.NO_MOVE_PROGRESS;
      }
      ...
  }    

针对一个namenode如果连续5次没有移动数据,就会退出平衡操作,是在NameNodeConnector#shouldContinue(long)中处理的。

由于这里需要进行大量计算,以及耗时的文件传输等操作,这里使用了executorservice,分别为moverExecutor和dispatcherExecutor,有两个配置dfs.balancer.moverThreads(1000)和dfs.balancer.dispatcherThreads(200)来设置线程池的大小。

1
2
3
4
5
6
7
8
9
  Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
      ...
    this.moverExecutor = Executors.newFixedThreadPool(
            conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
                        DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT));
    this.dispatcherExecutor = Executors.newFixedThreadPool(
            conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
                        DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
  }

其中dispatchBlockMoves()包装了数据移动的操作,把source的块移动到target节点中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
  private long dispatchBlockMoves() throws InterruptedException {
    long bytesLastMoved = bytesMoved.get();
    Future<?>[] futures = new Future<?>[sources.size()];
    int i=0;
    for (Source source : sources) {
       // / 新线程来执行块的分发
      futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
    }
    
    // wait for all dispatcher threads to finish
    // / 等待分发操作完成
    for (Future<?> future : futures) { 
        future.get(); 
    }
    
    // wait for all block moving to be done
    // / 等待块的数据移动完成,相当于等待moverExecutor的Future完成
    waitForMoveCompletion(); 
    
    return bytesMoved.get()-bytesLastMoved;
  }
  private void waitForMoveCompletion() {
    boolean shouldWait;
    do {
      shouldWait = false;
      for (BalancerDatanode target : targets) {
          // / 块从source移动到target完成后,会从Pending的列表中移除 @see PendingBlockMove#dispatch()
        if (!target.isPendingQEmpty()) { 
          shouldWait = true;
        }
      }
      if (shouldWait) {
        try {
          Thread.sleep(blockMoveWaitTime);
        } catch (InterruptedException ignored) {
        }
      }
    } while (shouldWait);
  }

上面是分发功能主程序执行的代码,调用分发线程和等待执行结果的代码。主要业务逻辑在线程中调用执行。

分发线程dispatcher先获取Source上指定大小的block块,对应到getBlockList()方法。除了用于块同步的globalBlockList变量、以及记录当前Source获取的srcBlockList、最重要的当属用于判断获取的块是否符合条件的方法isGoodBlockCandidate(block)。在移动块的选择也会用到该方法,单独拿出来在后面讲。

然后选择Source下哪些块将移动到Targets目标节点。在chooseNodes步骤中把移动和接收数据的流向确定了,相关信息存储在Source的nodeTasks列表对象中。这里PendingBlockMove.chooseBlockAndProxy()把Sources需要移动的确定下来,把从Source获取到的srcBlockList分配给Target。然后交给moverExecutor去执行。

其中通过isGoodBlockCandidatechooseProxySource(选择从那个目标获取block的真实数据,不一定是Source节点哦!)方法筛选合适的符合条件的块,并加入到movedBlocks对象。

调用的dispatchBlocks方法第一次循环是不会有数据移动的,此时Source对象中srcBlockList可移动块为空,从Source中获取块后再进行块的移动操作chooseNextBlockToMove()

先讲下Source类属性blocksToReceive,初始值为2*scheduledSize,有三个地方:dispatchBlocks初始化大小,getBlockList从Source节点获取block的量同时减去获取到的block的字节数,还有就是shouldFetchMoreBlocks用于判断是否还有数据需要获取或者移动dispatchBlocks。这个属性其实也就是设置一个阀,不管block是否为最终移动的block,获取到块的信息后就会从blocksToReceive减去相应的字节数。

前面获取Source block和分配到Target block都使用了isGoodBlockCandidate方法,这里涉及到怎么去评估获取和分配是否合理的问题。需同时满足下面三个条件:

  • 当前选中的移动的块,不在已移动块的名单中movedBlocks.contains
  • 移动的块在目的机器上没有备份
  • 移动的块不减少含有该数据的机架数量
    • 多机架的情况下cluster.isNodeGroupAware(),移动的块在目的机器的机架上没有备份
    • YES source和target在同一个机架上。
    • YES source和target不在同一机架上,且该块没有一个备份在target的机架上
    • YES source和target不在同一机架上,且该块有另一个备份和source在同一机架上

疑问

一个Datanode只能同时移动/接收5个Block(即MAX_NUM_CONCURRENT_MOVES值),结合chooseProxySource的代码的addTo调用,看的很是辛苦!如block-A所有块都在A机架上,在选择proxySource时,会把该块的两个datanode都加上一个pendingBlock,显然这不大合理!!

如果备用的proxySource节点恰好还是target的话,waitForMoveCompletion方法永远不能结束!!应该把没有找到同机架的源情况移到for循环外面进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private boolean chooseProxySource() {
  final DatanodeInfo targetDN = target.getDatanode();
  boolean find = false;
  for (BalancerDatanode loc : block.getLocations()) {
    // check if there is replica which is on the same rack with the target
    if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
      find = true;
      // if cluster is not nodegroup aware or the proxy is on the same 
      // nodegroup with target, then we already find the nearest proxy
      if (!cluster.isNodeGroupAware() 
          || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
        return true;
      }
    }
    
    if (!find) {
    // 这里的non-busy指的是,pendingBlock小于5份节点
      // find out a non-busy replica out of rack of target
      find = addTo(loc);
    }
  }
  
  return find;
}

不过无需庸人自扰,一般都在一个rack上,这种问题就不存在了!同时这个也不是能一步到位,加了很多限制(一次迭代一个datanode最多处理10G,获取一次srcBlockList仅2G还限制就一次迭代就5个block),会执行很多次。

总结

总体的代码大致就是这样子了。根据集群使用率和阀值,计算需要进行数据接收和移动的节点(初始化),然后进行配对(选择),再进行块的选取和接收节点进行配对(分发),最后就是数据的移动(理解为socket数据传递就好了,调用了HDFS的协议代码。表示看不明),并等待该轮操作结束。

举例

除了指定threshold为5,其他是默认参数。由于仅单namenode和单rack,所以直接分析第五部分的namenode平衡处理。

根据所给的数据,(initNodes)第一步计算使用率,得出需要移动的数据量,把datanodes对号入座到over/above/below/under四个分类中。

(chooseNodes)第二步进行Source到Target节点的计划移动数据量计算。

在初始化BalancerDatanode的时刻,就计算出了节点的maxSize2Move。从给出的数据,只有一个节点超过阀值,另外两个是都在阀值内,一个高于平均值一个低于平均值。

这里就是把A1超出部分的数据(小于10G)移到A2,计算Source和Target的scheduledSize的大小。

1
2
3
4
5
chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
chooseForOneDatanode(datanode, candidates, matcher)
chooseCandidate(dn, i, matcher)
// 把所有A1超出部分全部移到A2,并NodeTask(A2, 8428571.429)存储到Source:A1的nodeTaskList对象中
matchSourceWithTargetToMove((Source)dn, chosen);

(dispatchBlockMoves)第三步就是分发进行块的转移。

先设置blocksToReceive(2*scheduledSize=16857142.86)

1
2
3
4
5
6
7
8
9
chooseNextBlockToMove
chooseBlockAndProxy
markMovedIfGoodBlock
isGoodBlockCandidate
chooseProxySource

scheduleBlockMove

getBlockList

从Source获取块时,可能在A2上已经有了,会通过isGoodBlockCandidate来进行过滤。然后就是把它交给moverExecutor执行数据块的移动,完成后修改处理的数据量byteMoved,把移动的块从target和proxySource的pendingBlockList中删除。

重复进行以上步骤,直到全部所有节点的使用率都在阀值内,顺利结束本次平衡处理。

–END

[读码] Hadoop2 Balancer磁盘空间平衡(中)

code

执行hadoop-2.2.0/bin/hadoop balancer -h查看可以设置的参数(和sbin/start-balancer.sh一样)。

1
2
3
Usage: java Balancer
  [-policy <policy>]    the balancing policy: datanode or blockpool
  [-threshold <threshold>]  Percentage of disk capacity

main方法入口,可以接受threshold(大于等于1小于等于100, 默认值10)和policy(可取datanode[dfsused]/blockpool[ blockpoolused], 默认值datanode),具体的含义可以查看(上)篇中的javadoc的描述。

获取初始化参数

然后通过ToolRunner解析参数,并运行Cli工具类来执行HDFS的平衡。

1 设置检查

WIN_WIDTH(默认1.5h) 已移动的数据会记录movedBlocks(list)变量中,在移动成功的数据CUR_WIN的值经过该时间后会被移动到OLD_WIN—现在感觉作用不大,为了减少map的大小?

checkReplicationPolicyCompatibility()检查配置dfs.block.replicator.classname是否为BlockPlacementPolicyDefault子类,即是否满足3份备份的策略(1st本地,2nd另一个rack,3rd和第二份拷贝不同rack的节点)?

2 获取nameserviceuris

通过DFSUtil#getNsServiceRpcUris()来获取namenodes,调用getNameServiceUris()来得到一个URI的结果集:

1
2
3
4
5
6
+ nsId <- dfs.nameservices
  ? ha  <- dfs.namenode.rpc-address + [dfs.nameservices] + [dfs.ha.namenodes]
    Y+ => hdfs://nsId
    N+ => hdfs://[dfs.namenode.servicerpc-address.[nsId]] 或 hdfs://[dfs.namenode.rpc-address.[nsId]] 第二个满足条件的加入到nonPreferredUris
+ hdfs://[dfs.namenode.servicerpc-address] 或 hdfs://[dfs.namenode.rpc-address]  第二个满足条件的加入到nonPreferredUris
? [fs.defaultFs] 以hfds开头,且不在nonPreferredUris集合中是加入结果集

HA情况下的地址相关配置项可以查看官网的文档

1
2
3
dfs.nameservices
dfs.ha.namenodes.[nameservice ID]
dfs.namenode.rpc-address.[nameservice ID].[name node ID] 

3 解析threshold和policy参数

默认值: BalancingPolicy.Node.INSTANCE, 10.0。运行打印的日志如下,INFO日志中包括了初始化的参数信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
2014-09-05 10:55:12,183 INFO Balancer: Using a threshold of 1.0
2014-09-05 10:55:12,186 INFO Balancer: namenodes = [hdfs://umcc97-44:9000]
2014-09-05 10:55:12,186 INFO Balancer: p         = Balancer.Parameters[BalancingPolicy.Node, threshold=1.0]
2014-09-05 10:55:13,744 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2014-09-05 10:55:18,154 INFO net.NetworkTopology: Adding a new node: /default-rack/10.18.97.142:50010
2014-09-05 10:55:18,249 INFO net.NetworkTopology: Adding a new node: /default-rack/10.18.97.144:50010
2014-09-05 10:55:18,311 INFO net.NetworkTopology: Adding a new node: /default-rack/10.18.97.143:50010
2014-09-05 10:55:18,319 INFO Balancer: 2 over-utilized: [Source[10.18.97.144:50010, utilization=8.288283273062705], Source[10.18.97.143:50010, utilization=8.302032354001554]]
2014-09-05 10:55:18,320 INFO Balancer: 1 underutilized: [BalancerDatanode[10.18.97.142:50010, utilization=4.716543864576553]]
2014-09-05 10:55:33,918 INFO Balancer: Need to move 3.86 GB to make the cluster balanced.
2014-09-05 11:21:16,875 INFO Balancer: Decided to move 2.43 GB bytes from 10.18.97.144:50010 to 10.18.97.142:50010
2014-09-05 11:24:16,712 INFO Balancer: Decided to move 1.84 GB bytes from 10.18.97.143:50010 to 10.18.97.142:50010
2014-09-05 11:25:55,726 INFO Balancer: Will move 4.27 GB in this iteration

执行Balancer

4 调用Balancer#run执行

1
2
3
 # 调试命令
 export HADOOP_OPTS=" -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8087 "
 sbin/start-balancer.sh 

Balancer的静态方法run,循环处理所有namenodes。在实例化namenode的NameNodeConnector对象时,会把当前运行balancer程序的hostname写入到HDFS的/system/balancer.id文件中,可以用来控制同时只有一个balancer运行。

在循环处理的时刻使用Collections.shuffle(connectors)打乱了namenodes的顺序。

Balancer的静态方法run中是一个双层循环,实例化Balancer并调用实例方法run来处理每个namenode的平衡。运行后要么出错要么就是平衡顺利完成才算结束。平衡的返回状态值及其含义可以查看javadoc(上)篇。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
  static int run(Collection<URI> namenodes, final Parameters p,
      Configuration conf) throws IOException, InterruptedException {
    ...
      for (URI uri : namenodes) {
        connectors.add(new NameNodeConnector(uri, conf));
      }
    
      boolean done = false;
      for(int iteration = 0; !done; iteration++) {
        done = true;
        Collections.shuffle(connectors);
        for(NameNodeConnector nnc : connectors) {
          final Balancer b = new Balancer(nnc, p, conf);
          final ReturnStatus r = b.run(iteration, formatter, conf);
          // clean all lists
          b.resetData(conf);
          if (r == ReturnStatus.IN_PROGRESS) {
            done = false;
          } else if (r != ReturnStatus.SUCCESS) {
            //must be an error statue, return.
            return r.code;
          }
        }

        if (!done) {
          Thread.sleep(sleeptime);
        }
      }
    ...
  }

5 针对每个namenode的平衡处理

针对每个namenode的每次迭代,又可以分出初始化节点、选择移动节点、移动数据三个部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  private ReturnStatus run(int iteration, Formatter formatter, Configuration conf) {
      ...
      final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE));
      if (bytesLeftToMove == 0) {
        System.out.println("The cluster is balanced. Exiting...");
        return ReturnStatus.SUCCESS;
      }

      final long bytesToMove = chooseNodes();
      if (bytesToMove == 0) {
        System.out.println("No block can be moved. Exiting...");
        return ReturnStatus.NO_MOVE_BLOCK;
      }

      if (!this.nnc.shouldContinue(dispatchBlockMoves())) {
        return ReturnStatus.NO_MOVE_PROGRESS;
      }

      return ReturnStatus.IN_PROGRESS;
      ...
  }

获取集群Live Datanode节点的信息,和通过50070查看的信息差不多,然后调用initNode()方法。

5.1 初始化节点

initNodes()中获取每个Datanode的capacity和dfsUsed数据,计算整个集群dfs的平均使用率avgUtilization。 然后根据每个节点的使用率与集群使用率,以及阀值进行比较划分为4种情况: overUtilizedDatanodesaboveAvgUtilizedDatanodesbelowAvgUtilizedDatanodesunderUtilizedDatanodes

同时取超出平均+阀值低于平均-阀值的字节数最大值,即集群达到平衡需要移动的字节数。

为了测试,如果集群已经平衡,可以搞点数据让集群不平衡,方便查看调试。

1
2
3
bin/hadoop fs -D dfs.replication=1 -put XXXXX /abc

sbin/start-balancer.sh -threshold 1

5.2 选择节点

初始化节点后,计算出了需要移动的数据量。接下来就是选择移动数据的节点chooseNodes,以及接收对应数据的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  private long chooseNodes() {
    // First, match nodes on the same node group if cluster is node group aware
    if (cluster.isNodeGroupAware()) {
      chooseNodes(SAME_NODE_GROUP);
    }
    
    chooseNodes(SAME_RACK);
    chooseNodes(ANY_OTHER);

    long bytesToMove = 0L;
    for (Source src : sources) {
      bytesToMove += src.scheduledSize;
    }
    return bytesToMove;
  }
  private void chooseNodes(final Matcher matcher) {
    chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
    chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
    chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
  }

  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
      chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
          Matcher matcher) {
    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
      final D datanode = i.next();
      for(; chooseForOneDatanode(datanode, candidates, matcher); );
      if (!datanode.hasSpaceForScheduling()) {
        i.remove(); // “超出”部分全部有去处了
      }
    }
  }

  private <C extends BalancerDatanode> boolean chooseForOneDatanode(
      BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
    final Iterator<C> i = candidates.iterator();
    final C chosen = chooseCandidate(dn, i, matcher);

    if (chosen == null) {
      return false;
    }
    if (dn instanceof Source) {
      matchSourceWithTargetToMove((Source)dn, chosen);
    } else {
      matchSourceWithTargetToMove((Source)chosen, dn);
    }
    if (!chosen.hasSpaceForScheduling()) {
      i.remove(); // 可用的空间已经全部分配出去了
    }
    return true;
  }

  private <D extends BalancerDatanode, C extends BalancerDatanode>
      C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
    if (dn.hasSpaceForScheduling()) {
      for(; candidates.hasNext(); ) {
        final C c = candidates.next();
        if (!c.hasSpaceForScheduling()) {
          candidates.remove();
        } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
          return c;
        }
      }
    }
    return null;
  }  

选择到接收节点后,接下来计算可以移动的数据量(取双方的available的最大值),然后把接收节点数据量的信息NodeTask存储到Source的NodeTasks对象中。

1
2
3
4
5
6
7
8
9
  private void matchSourceWithTargetToMove(
      Source source, BalancerDatanode target) {
    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
    NodeTask nodeTask = new NodeTask(target, size);
    source.addNodeTask(nodeTask);
    target.incScheduledSize(nodeTask.getSize());
    sources.add(source);
    targets.add(target);
  }

5.3 移动数据

(待)

–END

计算出从1到100之间所有奇数的平方之和

计算出从1到100之间所有奇数的平方之和,代码50字符内(QQ群的验证框长度限制为50)

如题,题目没啥难度,这50字符的条件莫名的增添压迫感。其实java写也不用50个字符就能搞定的 !

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// (1 to 50) foreach {x => print("0")}
00000000000000000000000000000000000000000000000000

// java
int sum=0;for(int i=0;i<100;i+=2)sum+=i*i;

// scala
(1 to 100).map(a=>if(a%2==1)a*a else 0).foldLeft(0)(_+_)
(0 to 100).foldLeft(0)(_+((a:Int)=>if(a%2==1)a*a else 0)(_))
var sum=0;for(i<- 1 to 100)if(i%2==1)sum+=i*i
var sum=0;for(i<- 1 to 100; if i%2==1)sum+=i*i

(1 to 100 by 2).foldLeft(0)(_+((a:Int)=>a*a)(_))
(1 to 100 by 2).map(a=>a*a).foldLeft(0)(_+_)
var sum=0;for(i<- 1 to 100 by 2)sum+=i*i
(1 to 100 by 2).map(a=>a*a).reduce(_+_)

(1 to 100 by 2).map(a=>a*a).reduce(_+_)是里面最短的应该也是最好的了,既没有定义变量同时意义清晰一看就懂。

–END

Scala Shell #! 惊叹号井号

工作中主要是写java代码,shell也只是用于交互性操作,写脚本的次数比较少。对于#!井号叹号仅仅是教条式的添加在脚本开头,并且基本上都是#!/bin/sh

今天在看scala官方的入门教程尽然发现!#的写法,很是困惑,Google查询也不知道怎么描述关键字,一般搜索引擎都把这些操作符过滤掉了的。

1
2
3
4
5
6
7
#!/bin/sh
exec scala "$0" "$@"
!#
object HelloWorld extends App {
  println("Hello, world!")
}
HelloWorld.main(args)

首先了解下#!作用:如果#!在脚本的最开始,脚本程序会把第一行的剩余部分当做解析器指令;使用当前的解析器来执行程序,同时把当前脚本的路径作为参数传递给解析器。

In computing, a shebang is the character sequence consisting of the characters number sign and exclamation mark (that is, “#!”) at the beginning of a script.

Under Unix-like operating systems, when a script with a shebang is run as a program, the program loader parses the rest of the script’s initial line as an interpreter directive; the specified interpreter program is run instead, passing to it as an argument the path that was initially used when attempting to run the script.

如果把!#去掉,再执行上面的脚本则会报错:error: script file does not close its header with !# or ::!#,查寻一番后,这原来是Scala的脚本功能的内部处理。通过SourceFile.scala关键字搜索到了该文列出了具体的位置,还有A Scala shell script example和我有同样疑问。

可以在《Programing in Scala – A comprehensive step-by-step guide》一书的附录A中 Scala scripts on Unix and Windows 查找到相应的描述:把#!!#之间的内容忽略掉了。

语法糖的疑惑解决了,针对上面的脚本还有个问题:exec执行完了,下面的内容不执行了?在exec命令的前面打上调试语句,也只输出了sh start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
winse@Lenovo-PC ~
$ cat script.scala
#!/bin/sh
echo 'sh start'
exec scala "$0" "$@"
echo 'sh end'
!#
object HelloWorld extends App {
    print("hello world")
}

HelloWorld.main(args)

winse@Lenovo-PC ~
$ sh script.scala
sh start
hello world

exec 使用 exec 方式运行script时, 它和 source 一样, 也是让 script 在当前process内执行, 但是 process 内的原代码剩下部分将被终止. 同样, process 内的环境随script 改变而改变.

所以,整个脚本流程就是:执行shell,调用exec来调用scala的解释器执行整个脚本内容,而解释器会过滤掉#!!#之间内容,执行完后,exec退出脚本,实现scala脚本执行的功能。这样折中的使用方式,应该是为了处理参数传递*的问题!

参考

–END

Hadoop2 Mapreduce输入输出压缩

当数据达到一定量时,自然就想到了对数据进行压缩来降低存储压力。在Hadoop的任务中提供了5个参数来控制输入输出的数据的压缩格式。添加map输出数据压缩可以降低集群间的网络传输,最终reduce输出压缩可以减低hdfs的集群存储空间。

如果是使用hive等工具的话,效果会更加明显。因为hive的查询结果是临时存储在hdfs中,然后再通过一个Fetch Operator来获取数据,最后清理掉,压缩存储临时的数据可以减少磁盘的读写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<property>
  <name>mapreduce.output.fileoutputformat.compress</name>
  <value>false</value>
  <description>Should the job outputs be compressed?
  </description>
</property>

<property>
  <name>mapreduce.output.fileoutputformat.compress.type</name>
  <value>RECORD</value>
  <description>If the job outputs are to compressed as SequenceFiles, how should
               they be compressed? Should be one of NONE, RECORD or BLOCK.
  </description>
</property>

<property>
  <name>mapreduce.output.fileoutputformat.compress.codec</name>
  <value>org.apache.hadoop.io.compress.DefaultCodec</value>
  <description>If the job outputs are compressed, how should they be compressed?
  </description>
</property>

<property>
  <name>mapreduce.map.output.compress</name>
  <value>false</value>
  <description>Should the outputs of the maps be compressed before being
               sent across the network. Uses SequenceFile compression.
  </description>
</property>

<property>
  <name>mapreduce.map.output.compress.codec</name>
  <value>org.apache.hadoop.io.compress.DefaultCodec</value>
  <description>If the map outputs are compressed, how should they be 
               compressed?
  </description>
</property>

上面5个属性弄好,在core-sitem.xml加下io.compression.codecs基本就完成配置了。

这里主要探究下mapreduce(下面全部简称MR)过程中自动解压缩。刚刚接触Hadoop一般都不会去了解什么压缩不压缩的,先把hdfs-api,MR-api弄一遭。配置的TextInputFormat竟然能正确的读取tar.gz文件的内容,觉得不可思议,TextInputFormat不是直接读取txt行记录的输入嘛?难道还能读取压缩文件,先解压再…??

先说下OutputFormat,在MR中调用context.write写入数据的方法时,最终使用OutputFormat创建的RecordWriter进行持久化。在TextOutputFormat创建RecordWriter时,如果使用压缩会在结果文件名上加对应压缩库的后缀,如gzip压缩对应的后缀gz、snappy压缩对应后缀snappy等。对应下面代码的getDefaultWorkFile

同样对应的TextInputFormat的RecordReader也进行类似的处理:根据文件的后缀来判定该文件是否使用压缩,并使用对应的输入流InputStream来解码。

此处的关键代码为CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);,根据分块(split)的文件名来判断使用的压缩算法。 初始化Codec实现、以及根据文件名来获取压缩算法的实现还是挺有意思的:通过反转字符串然后最近匹配(headMap)来获取对应的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  private void addCodec(CompressionCodec codec) {
    String suffix = codec.getDefaultExtension();
    codecs.put(new StringBuilder(suffix).reverse().toString(), codec);
    codecsByClassName.put(codec.getClass().getCanonicalName(), codec);

    String codecName = codec.getClass().getSimpleName();
    codecsByName.put(codecName.toLowerCase(), codec);
    if (codecName.endsWith("Codec")) {
      codecName = codecName.substring(0, codecName.length() - "Codec".length());
      codecsByName.put(codecName.toLowerCase(), codec);
    }
  }

  public CompressionCodec getCodec(Path file) {
    CompressionCodec result = null;
    if (codecs != null) {
      String filename = file.getName();
      String reversedFilename = new StringBuilder(filename).reverse().toString();
      SortedMap<String, CompressionCodec> subMap = 
        codecs.headMap(reversedFilename);
      if (!subMap.isEmpty()) {
        String potentialSuffix = subMap.lastKey();
        if (reversedFilename.startsWith(potentialSuffix)) {
          result = codecs.get(potentialSuffix);
        }
      }
    }
    return result;
  }

了解了这些,MR(TextInputFormat)的输入文件可以比较随意些:各种压缩文件、原始文件都可以,只要文件有对应压缩算法的后缀即可。hive的解压缩功能也很容易了,如果使用hive存储text形式的文件,进行压缩无需进行额外的程序代码修改,仅仅修改MR的配置即可,注意下文件后缀!!

如,在MR中生成了snappy压缩的文件,此时不能在文件的后面添加东西。否则在hive查询时,根据后缀进行解压会导致结果乱码/不正确。

1
2
3
4
5
6
7
8
9
10
11
<property>
  <name>hive.exec.compress.output</name>
  <value>false</value>
  <description> This controls whether the final outputs of a query (to a local/hdfs file or a hive table) is compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>

<property>
  <name>hive.exec.compress.intermediate</name>
  <value>false</value>
  <description> This controls whether intermediate files produced by hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
</property>

hive也弄了两个参数来控制它自己的MR的输出输入压缩控制属性。其他的配置使用mapred-site.xml的配置即可。

网上一些资料有hive.intermediate.compression.codechive.intermediate.compression.type两个参数能调整中间过程的压缩算法。其实和mapreduce的参数功能是一样的。

附上解压缩的全部配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
$#core-site.xml
  <property>
      <name>io.compression.codecs</name>
      <value>
  org.apache.hadoop.io.compress.GzipCodec,
  org.apache.hadoop.io.compress.DefaultCodec,
  org.apache.hadoop.io.compress.BZip2Codec,
  org.apache.hadoop.io.compress.SnappyCodec
      </value>
  </property>

$#mapred-site.xml
  <property>
      <name>mapreduce.map.output.compress</name> 
      <value>true</value>
  </property>
  <property>
      <name>mapreduce.map.output.compress.codec</name>
      <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  </property>

  <property>
      <name>mapreduce.output.fileoutputformat.compress</name>
      <value>true</value>
  </property>

  <property>
      <name>mapreduce.output.fileoutputformat.compress.codec</name>
      <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  </property>
  <property>
      <name>mapred.output.compression.codec</name>
      <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  </property>

$#hive-site.xml
  <property>
      <name>hive.exec.compress.output</name>
      <value>true</value>
  </property>

运行hive后,临时存储在HDFS的结果数据,注意文件的后缀。

参考

–END