[esw@bigdata1 spark-1.3.0-bin-2.2.0]$ export SPARK_CLASSPATH=/home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar
[esw@bigdata1 spark-1.3.0-bin-2.2.0]$ bin/spark-shell --master local[1] -Dspark.ui.port=4041
scala> val s = sc.textFile("tachyon://bigdatamgr1:19998/README.md")
s: org.apache.spark.rdd.RDD[String] = tachyon://bigdatamgr1:19998/README.md MapPartitionsRDD[1] at textFile at <console>:21
scala> s.count()
15/04/03 11:13:09 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value.
res0: Long = 45
scala> val wordCounts = s.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23
scala> wordCounts.saveAsTextFile("tachyon://bigdatamgr1:19998/wordcount-README")
[esw@bigdatamgr1 tachyon-0.6.1]$ bin/tachyon tfs ls /wordcount-README/
1407.00 B 04-03-2015 11:16:05:483 In Memory /wordcount-README/part-00000
0.00 B 04-03-2015 11:16:05:787 In Memory /wordcount-README/_SUCCESS
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ export SPARK_CLASSPATH=/home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ bin/spark-shell --master local[1] --jars /home/esw/tachyon-0.6.1/core/target/tachyon-0.6.1-jar-with-dependencies.jar
scala> val s = sc.textFile("tachyon://bigdatamgr1:19998/NOTICE")
s: org.apache.spark.rdd.RDD[String] = tachyon://bigdatamgr1:19998/NOTICE MapPartitionsRDD[1] at textFile at <console>:15
scala> s.count()
15/04/13 16:05:45 WARN BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
15/04/13 16:05:45 WARN : tachyon.home is not set. Using /mnt/tachyon_default_home as the default value.
java.io.IOException: The machine does not have any local worker.
at tachyon.client.BlockOutStream.<init>(BlockOutStream.java:94)
at tachyon.client.BlockOutStream.<init>(BlockOutStream.java:65)
at tachyon.client.RemoteBlockInStream.read(RemoteBlockInStream.java:204)
at tachyon.hadoop.HdfsFileInputStream.read(HdfsFileInputStream.java:142)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1466)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1006)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
res0: Long = 2
两个点:
这里是运行的spark local集群;
运行当然没有问题,但是会打印不和谐的The machine does not have any local worker警告日志。这与FileSystem的获取输入流ReadType.CACHE实现有关(见源码HdfsFileInputStream)。
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs free /NOTICE
/NOTICE was successfully freed from memory.
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs fileinfo /NOTICE
/NOTICE with file id 2 has the following blocks:
ClientBlockInfo(blockId:2147483648, offset:0, length:62, locations:[NetAddress(mHost:bigdata8, mPort:-1, mSecondaryPort:-1), NetAddress(bigdata6, mPort:-1, mSecondaryPort:-1), NetAddress(mHost:bigdata5, mPort:-1, mSecondaryPort:-1)])
再次运行count:
12
scala> s.count()
res1: Long = 2
再次查看文件状态:
123
[esw@bigdatamgr1 spark-1.3.0-bin-2.2.0]$ ~/tachyon-0.6.1/bin/tachyon tfs fileinfo /NOTICE
/NOTICE with file id 2 has the following blocks:
ClientBlockInfo(blockId:2147483648, offset:0, length:62, locations:[NetAddress(mHost:bigdata1, mPort:29998, mSecondaryPort:29999)])