Winse Blog

走走停停都是风景, 熙熙攘攘都向最好, 忙忙碌碌都为明朝, 何畏之.

Logstash Elasticsearch Kibana日志采集查询系统搭建

软件版本

1
2
3
4
5
6
7
[root@master opt]# ll
total 20
drwxr-xr-x 7 root root 4096 Aug 21 01:23 elasticsearch-1.7.1
drwxr-xr-x 8 uucp  143 4096 Mar 18  2014 jdk1.8.0_05
drwxrwxr-x 7 1000 1000 4096 Aug 21 01:09 kibana-4.1.1-linux-x64
drwxr-xr-x 5 root root 4096 Aug 21 05:58 logstash-1.5.3
drwxrwxr-x 6 root root 4096 Aug 21 06:44 redis-3.0.3

安装运行脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# java
vi /etc/profile
source /etc/profile

cd /opt/elasticsearch-1.7.1
bin/elasticsearch -p elasticsearch.pid -d

curl localhost:9200/_cluster/nodes/172.17.0.4

cd /opt/kibana-4.1.1-linux-x64/
bin/kibana 
# http://master:5601

cd /opt/redis-3.0.3
yum install gcc
yum install bzip2
make MALLOC=jemalloc

# 也可以修改配置的daemon属性
nohup src/redis-server & 

cd /opt/logstash-1.5.3/
bin/logstash -e 'input { stdin { } } output { stdout {} }'

vi index.conf
vi agent.conf

# agent可不加
bin/logstash agent -f agent.conf &
bin/logstash agent -f index.conf &

logstash配置

由于程序都运行在一台机器(localhost),redis、elasticsearch和kibana都使用默认配置。下面贴的是logstash的采集和过滤的配置:

(kibaba的配置config/kibana.yml, elasticsearch的配置config/elasticsearch.yml)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@master logstash-1.5.3]# cat agent.conf 
input {
  file {
    path => "/var/log/yum.log"
    start_position => beginning
  }
}

output {
  redis {
    key => "logstash.redis"
    data_type => list
  }
  
  # 便于查看调试
  stdout { }
}

[root@master logstash-1.5.3]# cat index.conf 
input {
  redis {
    data_type => list
    key => "logstash.redis"
  }
}

output {
  elasticsearch {
    host => "localhost"
  }
}

注意要改动下被采集的原始文件!!然后启动相应的程序,打开浏览器http://master:5601配置一下索引项,就可以查看了。

至于input/output/filter(map,reduce)怎么配置,查看官方文档filter-plugins

filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[hadoop@cu1 logstash-1.5.3]$ bin/logstash -e "
input {
stdin {}
}

filter {
grok { 
match => {\"message\" => \"%{WORD:content}\"}
add_field => { \"foo_%{content}\" => \"helloworld\" }
}
}

output {
stdout { codec => json }
}
"

abc
{"message":"abc","@version":"1","@timestamp":"2015-09-10T08:02:52.024Z","host":"cu1","content":"abc","foo_abc":"helloworld"}

grok-pattern文件的位置:

1
2
3
4
5
6
7
[hadoop@cu2 logstash-1.5.3]$ less ./vendor/bundle/jruby/1.9/gems/logstash-patterns-core-0.1.10/patterns/grok-patterns 

2015-09-06 15:23:53,027 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: No KeyProvider found.
%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:loglevel} %{GREEDYDATA:content}

[2015-09-10 08:00:46,539][INFO ][cluster.metadata         ] [Jumbo Carnation] [logstash-2015.09.10] update_mapping [hbase-logs] (dynamic)
\[%{TIMESTAMP_ISO8601:time}\]\[%{LOGLEVEL:loglevel}%{SPACE}\]%{GREEDYDATA:content}

学习

过滤DEBUG/INFO日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[hadoop@cu1 logstash-1.5.3]$ bin/logstash -e "
 input {
 stdin {}
 }
 
 filter {
 grok {
 match => { \"message\" => \"%{TIMESTAMP_ISO8601:time} %{LOGLEVEL:loglevel} %{GREEDYDATA:content}\" }
 }
 
 if [loglevel] == \"INFO\" { drop {} }
 }
 
 output {
 stdout {}
 }
 
 "

用shell先预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
    stdin {
        type => "nginx"
        format => "json_event"
    }
} 
output {
    amqp {
        type => "nginx"
        host => "10.10.10.10"
        key  => "cdn"
        name => "logstash"
        exchange_type => "direct"
    }
}

#!/bin/sh
      tail -F /data/nginx/logs/access.json \
    | sed 's/upstreamtime":-/upstreamtime":0/' \
    | /usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/agent.conf &

参考

–END

Hadoop不同版本yarn和hdfs混搭,spark-yarn环境配置

hadoop分为存储和计算两个主要的功能,hdfs步入hadoop2后不论稳定性还是HA等等功能都比hadoop1要更吸引人。hadoop-2.2.0的hdfs已经比较稳定,但是yarn高版本有更加丰富的功能。本文主要关注spark-yarn下日志的查看,以及spark-yarn-dynamic的配置。

hadoop-2.2.0的hdfs原本已经在使用的环境,在这基础上搭建运行yarn-2.6.0,以及spark-1.3.0-bin-2.2.0。

  • 编译

我是在虚拟机里面编译,共享了host主机的maven库。参考【VMware共享目录】,【VMware-Centos6 Build hadoop-2.6】注意cmake_symlink_library的异常,由于共享的windows目录下不能创建linux的软链接

1
2
3
4
5
6
7
8
9
10
11
12
tar zxvf ~/hadoop-2.6.0-src.tar.gz 
cd hadoop-2.6.0-src/
mvn package -Pdist,native -DskipTests -Dtar -Dmaven.javadoc.skip=true

# 由于hadoop-hdfs还是2.2的,这里编译spark需要用2.2版本!
# 如果用2.6会遇到[UnsatisfiedLinkError:org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray ](http://blog.csdn.net/zeng_84_long/article/details/44340441)
cd spark-1.3.0
export MAVEN_OPTS="-Xmx3g -XX:MaxPermSize=1g -XX:ReservedCodeCacheSize=512m"
mvn clean package -Phadoop-2.2 -Pyarn -Phive -Phive-thriftserver -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -DskipTests

vi make-distribution.sh #注释掉BUILD_COMMAND那一行,不重复执行package!
./make-distribution.sh  --mvn `which mvn` --tgz  --skip-java-test -Phadoop-2.6 -Pyarn -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -DskipTests
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ cat conf/spark-defaults.conf 
# spark.master                     spark://bigdatamgr1:7077,bigdata8:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
# spark.executor.extraJavaOptions       -Xmx16g -Xms16g -Xmn256m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:ParallelGCThreads=10
spark.driver.memory              48g
spark.executor.memory            48g
spark.sql.shuffle.partitions     200

#spark.scheduler.mode FAIR
spark.serializer  org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize 8g
#spark.kryoserializer.buffer.max.mb 2048

spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 4
spark.shuffle.service.enabled true

[esw@bigdatamgr1 conf]$ cat spark-env.sh 
#!/usr/bin/env bash

JAVA_HOME=/home/esw/jdk1.7.0_60

# log4j

__add_to_classpath() {

  root=$1

  if [ -d "$root" ] ; then
    for f in `ls $root/*.jar | grep -v -E '/hive.*.jar'`  ; do
      if [ -n "$SPARK_DIST_CLASSPATH" ] ; then
        export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:$f
      else
        export SPARK_DIST_CLASSPATH=$f
      fi
    done
  fi

}
# this add tail of SPARK_CLASSPATH
__add_to_classpath "/home/esw/apache-hive-0.13.1/lib"

#export HADOOP_CONF_DIR=/data/opt/ibm/biginsights/hadoop-2.2.0/etc/hadoop
export HADOOP_CONF_DIR=/home/esw/hadoop-2.6.0/etc/hadoop
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/esw/spark-1.3.0-bin-2.2.0/conf:$HADOOP_CONF_DIR

# HA
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bi-00-01.bi.domain.com:2181 -Dspark.deploy.zookeeper.dir=/spark" 

SPARK_PID_DIR=${SPARK_HOME}/pids
  • 同步
1
for h in `cat slaves ` ; do rsync -vaz hadoop-2.6.0 $h:~/ --delete --exclude=work --exclude=logs --exclude=metastore_db --exclude=data --exclude=pids ; done
  • 启动spark-hive-thrift

./sbin/start-thriftserver.sh –executor-memory 29g –master yarn-client

对于多任务的集群来说,配置自动动态分配(类似资源池)更有利于资源的使用。可以通过【All Applications】-【ApplicationMaster】-【Executors】来观察执行进程的变化。

–END

Tachyon剖析

要了解一个框架,一般都是从框架提供/开放的接口入手。先从最直接的方式下手,可以通过tachyon tfs和浏览器19999端口获取集群以及文件的相关信息。

  • 要了解tachyon首先就是其文件系统,可以从两个功能开始:命令行tachyon.command.TFsShell和web-servlet。
  • 然后会深入tachyon.client包,了解TachyonFS和TachyonFile处理io的方式。以及tachyon.hadoop的包。
  • io处理:
    • 写:BlockOutStream(#getLocalBlockTemporaryPath; MappedByteBuffer)、FileOutStream
    • 读:RemoteBlockInStream、LocalBlockInStream
  • 了解thrift:MasterClient、MasterServiceHandler、WorkerClient、WorkerServiceHandler、ClientBlockInfo、ClientFileInfo。
  • 看tachyon.example,巩固

注:MappedByteBuffer在windows存在资源占用的bug!参见http://www.th7.cn/Program/java/2012/01/31/57401.shtml

把整个io流理清之后,然后需要了解tachyon是怎么维护这些信息的:

  • 配置:WorkerConf、MasterConf、UserConf
  • 了解thrift:MasterClient、MasterServiceHandler、ClientWorkerInfo、MasterInfo
  • TachyonMaster和TachyonWorker的启动
  • Checkpoint、Image、Journal
  • 内存淘汰策略
  • DataServer在哪里用到(nio/netty):TachyonFile#readRemoteByteBuffer、RemoteBlockInStream#read(byte[], int, int)
  • HA
  • Dependency(不知道怎么用)

远程调试Worker以及tfs:

1
2
3
4
5
[hadoop@hadoop-master2 tachyon-0.6.1]$ cat conf/tachyon-env.sh 
export TACHYON_WORKER_JAVA_OPTS="$TACHYON_JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8070"

[hadoop@hadoop-master2 tachyon-0.6.1]$ export TACHYON_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8070"
[hadoop@hadoop-master2 tachyon-0.6.1]$ bin/tachyon tfs lsr /

IO/client

  • TachyonFS是client与Master/Worker的纽带,请求文件系统和文件的元数据CRUD操作。其中的WorkerClient仅用于写(保存)文件。
  • TachyonFile是文件的抽象处理集合,可以获取文件的基本属性(元数据),同时提供了IO的接口用于文件内容的读写。
  • InStream读、获取文件内容
    • EmptyBlockInStream(文件包括的块为0)
    • BlockInStream(文件包括的块为1)
      • LocalBlockInStream 仅当是localworker且该块在本机时,通过MappedByteBuffer获取数据(数据在ramdisk也就是内存盘上哦)。
      • RemoteBlockInStream (通过nio从远程的worker#DataServer机器获取数据#retrieveByteBufferFromRemoteMachine,如果readtype设置为cache同时把数据缓冲到本地worker)
    • FileInStream(文件包括的块为1+,可以理解为BlocksInStream。通过mCurrentPosition / mBlockCapacity来获取当前的blockindex最终还是调用BlockInStream)
  • FileOutStream写,写数据入口就是只有FileOutStream
    • BlockOutStream(WriteType设置了需要缓冲,会写到本地localworker。由于需要进行分块,会复杂些#appendCurrentBuffer
    • UnderFileSystem(如果WriteType设置了Through,则把数据写一份到underfs文件系统)

Master

TachyonMaster是master的启动类,所有的服务都是在这个类里面初始化启动的。

  • HA:LeaderSelectorClient
  • EditLog:EditLogProcessor、Journal。
    • 如果是HA模式,leader调用setup方法把EditLogProcessor停掉。也就是说在HA模式下,standby才会运行EditLogProcessor实时处理editlog。
    • leader和非HA master则仅在启动时通过调用MasterInfo#init处理editlog一次。
  • Thrift: TServer、MasterServiceHandler;与MasterClient对应的服务端。
  • Web: UIWebServer,使用jetty的内嵌服务器。
  • MasterInfo

Worker

Thrift

HA

当配置tachyon.usezookeeper设置为true时,启动master时会初始化LeaderSelectorClient对象。使用curator连接到zookeeper服务器进行leader的选举。

LeaderSelectorClient实现了LeaderSelectorListener接口,创建LeaderSelector并传入当前实例作为监听实例,当选举完成后,被选leader触发takeLeadership事件。

public void takeLeadership(CuratorFramework client) throws Exception Called when your instance has been granted leadership. This method should not return until you wish to release leadership

takeLeadership方法中把mIsLeader设置为true(master自己判断)、创建mLeaderFolder + mName路径(客户端获取master leader);然后隔5s的死循环(运行在LeaderSelector单独的线程池)。

Checkpoint

Journal


问题

  • 程序没有返回内容,没有响应(v1.1.0已经没有这个问题了)

tfs 默认是CACHE_THROUGH,会缓冲同时写ufs。如果改成must则只写cache,然后清理内存,再获取数据,一直没有内容返回,不知道为什么?

1
2
3
4
5
6
[esw@bigdata1 tachyon-0.6.1]$ export TACHYON_JAVA_OPTS="-Dtachyon.user.file.writetype.default=MUST_CACHE "
[esw@bigdata1 tachyon-0.6.1]$ bin/tachyon tfs copyFromLocal LICENSE /LICENSE2
Copied LICENSE to /LICENSE2
[esw@bigdata1 tachyon-0.6.1]$ bin/tachyon tfs free /LICENSE2
/LICENSE2 was successfully freed from memory.
[esw@bigdata1 tachyon-0.6.1]$ bin/tachyon tfs cat /LICENSE2

v1.1

1
2
3
4
5
6
[hadoop@hadoop-master2 alluxio-1.1.0-SNAPSHOT]$ bin/alluxio fs ls /README.txt                                            
1366.00B  04-16-2016 07:56:24:499  In Memory      /README.txt
[hadoop@hadoop-master2 alluxio-1.1.0-SNAPSHOT]$ bin/alluxio fs free /README.txt
/README.txt was successfully freed from memory.
[hadoop@hadoop-master2 alluxio-1.1.0-SNAPSHOT]$ bin/alluxio fs cat /README.txt 
Block 402653184 is not available in Alluxio

–END

Tachyon入门指南

tachyon程序是在HDFS与程序之间缓冲,相当于CPU与磁盘设备之间内存的功能。tachyon提供了TachyonFS、TachyonFile等API使操作起来更像一个文件系统;同时实现了HDFS的FileSystem接口,方便原有程序的迁移,只要把url的模式(schema)hdfs改成tachyon。

tachyon和HDFS一样也是master-slaver(worker)结构:master保存元数据,worker节点使用内存盘缓冲数据。

部署集群

下载tachyon的编译文件后,按下面的步骤部署:

  • 解压
  • 修改conf/tachyon-env.sh(JAVA_HOME,TACHYON_UNDERFS_ADDRESS,TACHYON_MASTER_ADDRESS)
  • 修改conf/worker
  • 同步代码到workers子节点
  • 格式化tachyon(建立master和worker所需的各种目录)
  • 挂载内存盘
  • 启动集群
  • 通过19999端口访问

如果hadoop集群的版本不是最新的2.6.0,需要手工编译源码:

1
$ mvn clean package assembly:single -Dhadoop.version=2.2.0 -DskipTests -Dmaven.javadoc.skip=true

同步程序的脚本如下:

1
[esw@bigdatamgr1 ~]$ for h in `cat slaves ` ; do  rsync -vaz tachyon-0.6.1 $h:~/ --exclude=logs --exclude=underfs --exclude=journal ; done

用tachyon用户格式化:

1
bin/tachyon format

使用root挂载内存盘:

1
2
bin/tachyon-mount.sh Mount workers
for h in `cat slaves ` ; do  ssh $h "chmod 777 /mnt/ramdisk; chmod 777 /mnt/tachyon_default_home"  ; done

确认下worker节点是否有underfs/tmp/tachyon/data,如果没有手动创建下。

1
[esw@bigdatamgr1 ~]$ for h in `cat slaves ` ; do ssh $h mkdir -p ~/tachyon-0.6.1/underfs/tmp/tachyon/data ; done

启动集群:

1
[esw@bigdatamgr1 tachyon-0.6.1]$ bin/tachyon-start.sh all NoMount

上传文件到tachyon:(注意,这里是在worker节点!)

1
2
[esw@bigdata1 tachyon-0.6.1]$ bin/tachyon tfs copyFromLocal README.md /
Copied README.md to /

集成到Spark

注意,这里是在worker节点,使用local本地集群的方式(spark集群资源全部被spark-sql占用了,导致提交的任务分配不到资源!)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[esw@bigdata1 spark-1.3.0-bin-2.2.0]$ export SPARK_CLASSPATH=/home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar 
[esw@bigdata1 spark-1.3.0-bin-2.2.0]$ bin/spark-shell --master local[1] -Dspark.ui.port=4041
scala> val s = sc.textFile("tachyon://bigdatamgr1:19998/README.md")
s: org.apache.spark.rdd.RDD[String] = tachyon://bigdatamgr1:19998/README.md MapPartitionsRDD[1] at textFile at <console>:21

scala> s.count()
15/04/03 11:13:09 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value.
res0: Long = 45

scala> val wordCounts = s.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23

scala> wordCounts.saveAsTextFile("tachyon://bigdatamgr1:19998/wordcount-README")

[esw@bigdatamgr1 tachyon-0.6.1]$ bin/tachyon tfs ls /wordcount-README/
1407.00 B 04-03-2015 11:16:05:483  In Memory      /wordcount-README/part-00000
0.00 B    04-03-2015 11:16:05:787  In Memory      /wordcount-README/_SUCCESS

为啥要在worker节点运行呢?不能在master节点运行?运行肯定是可以的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ export SPARK_CLASSPATH=/home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ bin/spark-shell --master local[1] --jars /home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar

scala> val s = sc.textFile("tachyon://bigdatamgr1:19998/NOTICE")
s: org.apache.spark.rdd.RDD[String] = tachyon://bigdatamgr1:19998/NOTICE MapPartitionsRDD[1] at textFile at <console>:15

scala> s.count()
15/04/13 16:05:45 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
15/04/13 16:05:45 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value.
java.io.IOException: The machine does not have any local worker.
        at tachyon.client.BlockOutStream.<init>(BlockOutStream.java:94)
        at tachyon.client.BlockOutStream.<init>(BlockOutStream.java:65)
        at tachyon.client.RemoteBlockInStream.read(RemoteBlockInStream.java:204)
        at tachyon.hadoop.HdfsFileInputStream.read(HdfsFileInputStream.java:142)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1466)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
res0: Long = 2

两个点:

  • 这里是运行的spark local集群;
  • 运行当然没有问题,但是会打印不和谐的The machine does not have any local worker警告日志。这与FileSystem的获取输入流ReadType.CACHE实现有关(见源码HdfsFileInputStream)。
1
mTachyonFileInputStream = mTachyonFile.getInStream(ReadType.CACHE);

如果master为spark集群,spark-driver不管运行在哪台集群都没有问题。因为,此时运行任务的spark-worker就是tachyon-worker节点啊,当然就有local worker了。

为了更深入的了解,还可以试验一下ReadType.CACHE的作用:原本不在内存的数据,计算后就会被载入到缓冲(内存)!!

可以再试一次,先从内存中删掉(此处underfs配置存储在HDFS)

1
2
3
4
5
6
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs free /NOTICE
/NOTICE was successfully freed from memory.

[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs fileinfo /NOTICE
/NOTICE with file id 2 has the following blocks: 
ClientBlockInfo(blockId:2147483648, offset:0, length:62, locations:[NetAddress(mHost:bigdata8, mPort:-1, mSecondaryPort:-1), NetAddress(bigdata6, mPort:-1, mSecondaryPort:-1), NetAddress(mHost:bigdata5, mPort:-1, mSecondaryPort:-1)])

再次运行count:

1
2
scala> s.count()
res1: Long = 2

再次查看文件状态:

1
2
3
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs fileinfo /NOTICE
/NOTICE with file id 2 has the following blocks: 
ClientBlockInfo(blockId:2147483648, offset:0, length:62, locations:[NetAddress(mHost:bigdata1, mPort:29998, mSecondaryPort:29999)])

此时文件对应的block所在机器变成了bigdata1,也就是spark-worker运行的节点(这里用local,worker和driver都在bigdata1上)。

参考

集成到Hadoop集群

1
2
3
4
5
6
7
8
9
10
11
12
[esw@bigdatamgr1 ~]$ export HADOOP_CLASSPATH=/home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar

[esw@bigdatamgr1 hadoop-2.2.0]$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar wordcount -libjars /home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar tachyon://bigdatamgr1:19998/NOTICE tachyon://bigdatamgr1:19998/NOTICE-wordcount

[esw@bigdatamgr1 hadoop-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs cat /NOTICE-wordcount/part-r-00000
2012-2014       1
Berkeley        1
California,     1
Copyright       1
Tachyon 1
University      1
of      1

后记

当前apache开源大部分集群的部署都是同一种模式,源码也基本都是用maven来进行构建。部署其实没有什么难度,如果是应用到spark、hadoop这样的平台,其实只要部署,然后用FileSystem的接口就一切ok了。但是要了解其原理,官网的文档也不是很全,那得需要深入源码。

入门写到这里,差不多了,下一篇从TachyonFS角度解析tachyon。

附录

  • spark-env.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
JAVA_HOME=/home/esw/jdk1.7.0_60

# log4j

__add_to_classpath() {

  root=$1

  if [ -d "$root" ] ; then
    for f in `ls $root/*.jar | grep -v -E '/hive.*.jar'`  ; do
      if [ -n "$SPARK_DIST_CLASSPATH" ] ; then
        export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:$f
      else
        export SPARK_DIST_CLASSPATH=$f
      fi
    done
  fi

}

__add_to_classpath "/home/esw/tez-0.4.0-incubating"
__add_to_classpath "/home/esw/tez-0.4.0-incubating/lib"
__add_to_classpath "/home/esw/apache-hive-0.13.1/lib"

export HADOOP_CONF_DIR=/data/opt/ibm/biginsights/hadoop-2.2.0/etc/hadoop
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/esw/spark-1.3.0-bin-2.2.0/conf:$HADOOP_CONF_DIR

# HA
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bi-00-01.bi.domain.com:2181 -Dspark.deploy.zookeeper.dir=/spark"

[esw@bigdatamgr1 ~]$ for h in `cat slaves ` ; do rsync -vaz spark-1.3.0-bin-2.2.0 $h:~/ --exclude=logs --exclude=metastore_db --exclude=work --delete ; done

–END

使用RamDisk来优化系统

最近加了一条8G的内存,总共16G。暂时来说很难全部用起来。如果能够实现类似linux的shm分区的话,那就完美了,把临时的数据全部放到这个内存分区中。最好是免费的,通过一阵折腾搜索,整理如下:

去到官网http://www.ltr-data.se/opencode.html/#ImDisk直接下载ImDisk Toolkithttp://reboot.pro/files/file/284-imdisk-toolkit/,toolkit里面已经集成了ImDisk软件。(新版本的toolkit可以节省很多事情,参考最后的两个链接看看即可)

配置:填写大小5、盘符S、磁盘格式NTFS,然后点击【确定】格式化磁盘,然后就可以使用了。

把临时的文件目录指定到ramdisk,重启系统。

上面仅仅是把用户和系统的临时目录移到内存盘中。由于rar,java一些软件都是用用户的临时目录,已经可以体验到加速的快感了!!直接拖拽解压rar情况下速度明显快了很多。

还有一个问题,重启后,内存盘的数据会被全部清掉。默认情况下只建立了Temp目录,没有我们指定的Cache目录。Chrome启动的时刻如果发现Cache目录为不可用状态会重建该目录。

在Advanced页签,Load Content from Image File or Folder选项可以选择初始化加载的内容。我们只要先把目录结构建立后,然后在初始化后加载该路径一切都解决了。

1
2
3
4
5
E:\local\home\RamDiskInit>find .
.
./Temp
./Temp/Chrome
./Temp/Chrome/Cache

然后在RamDisk Config的Advanced页签选择E:\local\home\RamDiskInit作为Load Content即可。

参考

–END