Article
hive on spark
先看官网的资源Hive on Spark: Getting Started 。文档是值得信任和有保证的,但是有前提:Spark版本得是hive/pom.xml中指定的。
# 重新编译spark(assembly包中去掉hive、hadoop)
这里hive-1.2.1用的是spark-1.3.1 !!!
[hadoop@cu2 spark-1.3.1]$ ./make-distribution.sh --name "hadoop2.6.3-without-hive" --tgz --mvn "$(which mvn)" -Pyarn,hadoop-provided,hadoop-2.6,parquet-provided -Dhadoop.version=2.6.3 -Dmaven.test.skip=true -Dmaven.javadoc.skip=true -DskipTests
拷贝打包好的 spark-1.3.1-bin-hadoop2.6.3-without-hive.tgz 到服务器。解压并做一个软链接到spark(或者指定 SPARK_HOME 环境变量 ),Hive不遗余力啊,把所有想的jar通过各种办法拿到 ( sparkHome=$(readlink -f $bin/../../spark) )。
[hadoop@hadoop-master2 ~]$ ln -s spark-1.3.1-bin-hadoop2.6.3-without-hive spark
把压缩包传到hdfs,这样每次启动任务就少传几百M的数据。后面spark.yarn.jar配置会用到
[hadoop@hadoop-master2 ~]$ cd spark/lib/
[hadoop@hadoop-master2 lib]$ hadoop fs -put spark-assembly-1.3.1-hadoop2.6.3.jar /spark/
做好软链接后效果:
[hadoop@hadoop-master2 ~]$ ll | grep -E "hive|spark"
drwxrwxr-x 9 hadoop hadoop 4096 1月 14 08:08 apache-hive-1.2.1-bin
lrwxrwxrwx 1 hadoop hadoop 21 1月 14 08:07 hive -> apache-hive-1.2.1-bin
lrwxrwxrwx 1 hadoop hadoop 40 3月 28 16:38 spark -> spark-1.3.1-bin-hadoop2.6.3-without-hive
drwxrwxr-x 10 hadoop hadoop 4096 3月 28 16:31 spark-1.3.1-bin-hadoop2.6.3-without-hive
drwxrwxr-x 12 hadoop hadoop 4096 3月 25 16:18 spark-1.6.0-bin-2.6.3
drwxrwxr-x 11 hadoop hadoop 4096 3月 28 11:15 spark-1.6.0-bin-hadoop2-without-hive
这里的spark-1.6.0是教训啊!记住最好最好用hive/pom.xml中spark的版本!!!
# 修改hive配置
由于spark会加载很多的class,需要把permsize调大。
[hadoop@hadoop-master2 ~]$ less ~/hive/conf/hive-env.sh
export HADOOP_OPTS="$HADOOP_OPTS -XX:MaxPermSize=256m -Dhive.home=${HIVE_HOME} "
在conf目录下增加spark-defaults.conf文件,指定spark的配置。动态资源分配查看:dynamic-resource-allocation:
[hadoop@hadoop-master2 conf]$ cat spark-defaults.conf
spark.yarn.jar hdfs:///spark/spark-assembly-1.3.1-hadoop2.6.3.jar
spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.executorIdleTimeout 600
spark.dynamicAllocation.minExecutors 160
spark.dynamicAllocation.maxExecutors 1800
spark.dynamicAllocation.schedulerBacklogTimeout 5
spark.driver.memory 10g
spark.driver.maxResultSize 0
spark.eventLog.enabled true
spark.eventLog.compress true
spark.eventLog.dir hdfs:///spark-eventlogs
spark.yarn.historyServer.address hadoop-master2:18080
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max 512m
- minExecutors 最好应该是和datanode机器数量差不多,每台一个executor才能本地计算嘛!
- dynamicAllocation需要yarn的配合,具体查看前一篇文章,或者直接看官网的资料。
- eventlog查看历史记录需要,配置好后每个任务的信息会存储到eventlog.dir的路径。通过18080端口可以看到历史记录。
# 跑起来
spark.master 默认是 yarn-cluster, 这里先本地(local)跑一下看下效果。然后再改成yarn-cluster/yarn-client就可以了(推荐使用yarn-client,如果yarn-cluster模式AppMaster同时也是Driver,内存比较难控制,日志看起来也麻烦)。
[hadoop@hadoop-master2 hive]$ hive --hiveconf hive.execution.engine=spark
hive> set spark.master=local;
hive> select count(*) from t_house_info ;
Query ID = hadoop_20160328163952_93dafddc-c8b1-4bc9-b851-5e51f6d26fa8
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Spark Job = 0
Query Hive on Spark job[0] stages:
0
1
Status: Running (Hive on Spark job[0])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-28 16:40:02,077 Stage-0_0: 0(+1)/1 Stage-1_0: 0/1
2016-03-28 16:40:03,078 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished
Status: Finished successfully in 2.01 seconds
OK
1
Time taken: 10.169 seconds, Fetched: 1 row(s)
hive>
再回过头看其实挺简单,和官方文档中的差不多。
注意:hive的日志级别可以通过 hive-log4j.properties 来配置。
有一个问题,不管yarn-cluser还是yarn-client(hive1.2.1-on-spark1.3.1),application强制kill掉以后,再查询会失败,应该是application杀了但是session还在!
[hadoop@file1 ~]$ yarn application -kill application_1460379750886_0012
16/04/13 08:47:17 INFO client.RMProxy: Connecting to ResourceManager at file1/192.168.102.6:8032
Killing application application_1460379750886_0012
16/04/13 08:47:18 INFO impl.YarnClientImpl: Killed application application_1460379750886_0012
> select count(*) from t_info where edate=20160413;
Query ID = hadoop_20160413084736_ac8f88bb-5ee1-4941-9745-f4a8a504f2f3
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Spark Job = eb7e038a-2db0-45d7-9b0d-1e55d354e5e9
Status: Failed
FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask
# 坑坑坑
刚开始弄的时刻,没管spark的版本的。直接上spark-1.6.0,然后完全跑不通,看hive.log的日志,啥都看不出来。最后查看http://markmail.org/message/reingwn556e7e37yHive on Spark的老大邮件列表的回复,把 spark.master=local 设置成本地跑才看到一点点有用的错误信息。
hive> set hive.execution.engine=spark;
hive> select count(*) from t_ods_access_log2 where day=20160327;
Query ID = hadoop_20160328083028_a9fb9860-38dc-4288-8415-b5b2b88f920a
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create spark client.)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.spark.SparkTask
日志里面’毛’有用信息都没有!
把日志级别调成debug(hive-log4j.properties),并把 set spark.master=local; 设置成本地。再跑日志:
2016-03-28 15:13:52,549 DEBUG internal.PlatformDependent (Slf4JLogger.java:debug(71)) - Javassist: unavailable
2016-03-28 15:13:52,549 DEBUG internal.PlatformDependent (Slf4JLogger.java:debug(71)) - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes. Please check the configuration for better performance.
2016-03-28 15:14:56,594 DEBUG storage.BlockManager (Logging.scala:logDebug(62)) - Putting block broadcast_0_piece0 without replication took 8 ms
2016-03-28 15:14:56,597 ERROR util.Utils (Logging.scala:logError(95)) - uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.AbstractMethodError
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:62)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
调用抽象方法的错误。然后查看了hive-1.2.1中 SparkListener实现类JobMetricsListener 确实没有(spark-1.6.0)62行错误的onBlockUpdated方法实现。然后把spark换成1.3.1一切就好了,其他就是文章前面写的。
心得: 刚刚开始用一个新东西的时刻,还是安装官网指定的版本来用省心。等到自己熟悉后,在玩其他的。
# hive on spark VS SparkSQL VS hive on tez
前一篇已经弄好了SparkSQL,SparkSQL也有thriftserver服务,这里说说为啥还选择搞hive-on-spark:
- SparkSQL-Thriftserver所有结果全部内存,快是快,但是不能满足查询大量数据的需求。如果查询几千万的数据,SparkSQL是搞不定的。而hive-on-spark除了计算用spark其他逻辑都是hive的,返回的结果会先写hdfs,再慢慢返回给客户端。
- SparkSQL-Thriftserver代码的是全部用scala重写的,和已有hive业务不一定兼容!!
- SparkSQL-Thriftserver有一个最大的优势就是整个server相当于hive-on-spark的一个session,网页监控漂亮清晰。而hive-on-spark不同的session那就相当于不同的application!!(2016-4-13 20:57:23)用了动态分配,没感觉SparkSQLThriftserver快很多。
- SparkSQL由于基于内存,再一些调度方面做了优化。如[limit]: hive是死算,sparksql递增数据量的一次次的试。sparksql可以这么做的,毕竟算好的数据在内存里面放着。
hive和sparksql的理念不同,hive的存储是HDFS,而sparksql只是把HDFS作为持久化工具,它的数据基本都放内存。
查看hive的日志,可以看到返回结果后有写HDFS的动作体现,会有类似日志:
2016-03-28 19:39:25,687 INFO exec.FileSinkOperator (Utilities.java:mvFileToFinalPath(1882))
- Moving tmp dir: hdfs://zfcluster/hive/scratchdir/hadoop/de2b263e-9601-4df7-bc38-ba932ae83f42/hive_2016-03-28_19-38-08_834_7914607982986605890-1/-mr-10000/.hive-staging_hive_2016-03-28_19-38-08_834_7914607982986605890-1/_tmp.-ext-10001
to: hdfs://zfcluster/hive/scratchdir/hadoop/de2b263e-9601-4df7-bc38-ba932ae83f42/hive_2016-03-28_19-38-08_834_7914607982986605890-1/-mr-10000/.hive-staging_hive_2016-03-28_19-38-08_834_7914607982986605890-1/-ext-10001
- tez的优势spark都有,并且tez其实缓冲优势并不大。而spark的缓冲效果更明显,而且可以快速返回。例如:你查3万条数据,tez是要全部查询然后再返回的,而sparksql取到3万条其他就不算了(效果看起来是这样子,具体没看源码实现;md hive-on-spark还是会全部跑)。
- tez任务缓冲不能共享,spark更加细化,可以有process级别缓冲(就是用上次计算过的结果,加载过的缓冲)!例如,你查数据记录同时又要返回count,这时有些操作是prcess_local级别的,这个tez是不能比的!
- spark的日志UI看起来更便捷,呵呵
单就从用的角度,spark全面取胜啊。
# 参考
- https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
- http://spark.apache.org/docs/1.3.1/configuration.html
- http://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
- cloudera-hos优化: http://www.cloudera.com/documentation/enterprise/latest/topics/admin_hos_tuning.html
–END
Related
Related posts
-
杀鸡焉用牛刀:DuckDB 正取代部分 Spark 场景
2026-02-16
-
基于对象存储的 Spark 数据读写实战:从末尾追加到任意更新
2025-10-28
-
认真的博客
2021-12-08
-
视频自动翻译
2018-08-25