Article
parquet学习
# 经典文章
- http://parquet.apache.org/documentation/latest/
- https://blog.twitter.com/2013/dremel-made-simple-with-parquet
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
# 概念
- Row Group
- Column Chunk
- Page
- Definition Levels: To support nested records we need to store the level for which the field is null. This is what the definition level is for: from 0 at the root of the schema up to the maximum level for this column. When a field is defined then all its parents are defined too, but when it is null we need to record the level at which it started being null to be able to reconstruct the record.
- Repetition Levels: To support repeated fields we need to store when new lists are starting in a column of values. This is what repetition level is for: it is the level at which we have to create a new list for the current value. In other words, the repetition level can be seen as a marker of when to start a new list and at which level.
- Page
- Column Chunk
- FileMetaData
The definition and repetition levels are optional, based on the schema definition. If the column is not nested (i.e. the path to the column has length 1), we do not encode the repetition levels (it would always have the value 1). For data that is required, the definition levels are skipped (if encoded, it will always have the value of the max definition level).
For example, in the case where the column is non-nested and required, the data in the page is only the encoded values.
An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.
# texfile转parquet
[hadoop@hadoop-master2 ~]$ cd apache-hive-1.2.1-bin/
[hadoop@hadoop-master2 apache-hive-1.2.1-bin]$ bin/hive
hive> CREATE TABLE `t_ods_access_log2_parquet`( `houseid` string, `sourceip` string, `destinationip` string, `sourceport` string, `destinationport` string, `domain` string, `url` string, `accesstime` string, `logid` string, `sourceipnum` bigint, `timedetected` string, `protocol` string, `duration` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS PARQUET LOCATION '/user/hive/t_ods_access_log2_parquet'
关键 STORED AS PARQUET。
关于压缩,可以通过mapreduce参数设置( mapreduce.output.fileoutputformat.compress 和 mapreduce.output.fileoutputformat.compress.codec ),但是推荐使用 parquet.compression 属性来指定。
reader/writer都会从 CodecConfig.getCodec() 获取压缩编码。代码中会从parquet属性和mapreduce获取压缩参数。
alter table t_ods_access_log2_parquet SET TBLPROPERTIES ('parquet.compression' = 'SNAPPY' );
create table t_ods_access_log2_parquet_none like t_ods_access_log2_parquet TBLPROPERTIES ('parquet.compression' = 'UNCOMPRESSED' );
create table t_ods_access_log2_parquet_gzip like t_ods_access_log2_parquet TBLPROPERTIES ('parquet.compression' = 'GZIP' );
直接使用hive的insert into语句就可以把原来的textfile的文件转成parquet格式。同时也转成gzip和uncompress比较了一下:
| 文件格式 | 压缩 | 大小 |
|---|---|---|
| textfile | snappy | 4.1G |
| parquet | snappy | 3.6G |
| parquet | uncompress | 7.2G |
| parquet | gzip | 2.2G |
直接count整个数据表,使用parquet的输入1M不到数据,太环保了!!(文件都是几十M的,一个文件(block)可以都在一台机器上)。
| 文件格式 | 运行引擎 | 大小 |
|---|---|---|
| textfile | tez | HDFS_BYTES_READ 4,454,071,542 |
| parquet | tez | HDFS_BYTES_READ 415,870 |
| textfile | sparksql | Input 4.1 GB |
| parquet | sparksql | Input 384.9 KB |
用sparksql跑textfile竟然更快。果然内存大暴力也很牛啊!!
hive> insert into t_ods_access_log2_back select houseid, sourceip, destinationip, sourceport, destinationport, domain, url, accesstime, logid, sourceipnum, timedetected, protocol, duration from t_ods_access_log2 where hour=2016032804 ;
Query ID = hadoop_20160329200414_96f1de35-48c5-4b38-977f-05de8554f388
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1458893800770_3955)
--------------------------------------------------------------------------------
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
--------------------------------------------------------------------------------
Map 1 .......... SUCCEEDED 152 152 0 0 1 0
--------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 341.56 s
--------------------------------------------------------------------------------
Loading data to table default.t_ods_access_log2_back
Table default.t_ods_access_log2_back stats: [numFiles=152, numRows=57688987, totalSize=4454071542, rawDataSize=11018516544]
OK
Time taken: 347.997 seconds
hive> insert into t_ods_access_log2_parquet select houseid, sourceip, destinationip, sourceport, destinationport, domain, url, accesstime, logid, sourceipnum, timedetected, protocol, duration from t_ods_access_log2 where hour=2016032804 ;
Query ID = hadoop_20160329212157_57b66595-5dfc-4fc9-9ad1-398e2b8ade6b
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.
Status: Running (Executing on YARN cluster with App id application_1458893800770_3992)
--------------------------------------------------------------------------------
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
--------------------------------------------------------------------------------
Map 1 .......... SUCCEEDED 152 152 0 0 0 0
--------------------------------------------------------------------------------
VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 237.28 s
--------------------------------------------------------------------------------
Loading data to table default.t_ods_access_log2_parquet
Table default.t_ods_access_log2_parquet stats: [numFiles=0, numRows=1305035789, totalSize=0, rawDataSize=16965465257]
OK
Time taken: 260.515 seconds
hive> select count(*) from t_ods_access_log2_back;
Query ID = hadoop_20160329212644_da8e7997-5bcc-41ab-8b63-f1a5919c5a2f
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1458893800770_3992)
--------------------------------------------------------------------------------
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
--------------------------------------------------------------------------------
Map 1 .......... SUCCEEDED 107 107 0 0 0 0
Reducer 2 ...... SUCCEEDED 1 1 0 0 0 0
--------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 59.01 s
--------------------------------------------------------------------------------
OK
57688987
Time taken: 59.768 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_parquet;
Query ID = hadoop_20160329212813_2fb8dafa-5c9a-40e8-a904-13e7cf865ec6
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1458893800770_3992)
--------------------------------------------------------------------------------
VERTICES STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
--------------------------------------------------------------------------------
Map 1 .......... SUCCEEDED 106 106 0 0 0 0
Reducer 2 ...... SUCCEEDED 1 1 0 0 0 0
--------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 45.82 s
--------------------------------------------------------------------------------
OK
57688987
Time taken: 47.275 seconds, Fetched: 1 row(s)
hive> set hive.execution.engine=spark;
hive> set spark.master=yarn-client;
hive> select count(*) from t_ods_access_log2_back;
Query ID = hadoop_20160329214550_a58d1056-9c91-4bbe-be7d-122ec3efdd8d
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Spark Job = 3a03d432-83a4-4d5a-a878-c9e52aa94bed
Query Hive on Spark job[0] stages:
0
1
Status: Running (Hive on Spark job[0])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:46:26,523 Stage-0_0: 0(+114)/152 Stage-1_0: 0/1
2016-03-29 21:46:27,535 Stage-0_0: 0(+115)/152 Stage-1_0: 0/1
2016-03-29 21:46:30,563 Stage-0_0: 0(+115)/152 Stage-1_0: 0/1
2016-03-29 21:46:33,582 Stage-0_0: 0(+115)/152 Stage-1_0: 0/1
2016-03-29 21:46:36,606 Stage-0_0: 0(+115)/152 Stage-1_0: 0/1
2016-03-29 21:46:39,624 Stage-0_0: 0(+115)/152 Stage-1_0: 0/1
2016-03-29 21:46:41,637 Stage-0_0: 0(+118)/152 Stage-1_0: 0/1
2016-03-29 21:46:42,644 Stage-0_0: 4(+115)/152 Stage-1_0: 0/1
2016-03-29 21:46:43,651 Stage-0_0: 110(+41)/152 Stage-1_0: 0/1
2016-03-29 21:46:44,658 Stage-0_0: 124(+28)/152 Stage-1_0: 0/1
2016-03-29 21:46:45,665 Stage-0_0: 128(+24)/152 Stage-1_0: 0/1
2016-03-29 21:46:46,671 Stage-0_0: 138(+14)/152 Stage-1_0: 0/1
2016-03-29 21:46:47,677 Stage-0_0: 142(+10)/152 Stage-1_0: 0/1
2016-03-29 21:46:48,684 Stage-0_0: 144(+8)/152 Stage-1_0: 0/1
2016-03-29 21:46:49,691 Stage-0_0: 147(+5)/152 Stage-1_0: 0/1
2016-03-29 21:46:50,698 Stage-0_0: 148(+4)/152 Stage-1_0: 0/1
2016-03-29 21:46:51,705 Stage-0_0: 149(+3)/152 Stage-1_0: 0/1
2016-03-29 21:46:52,712 Stage-0_0: 150(+2)/152 Stage-1_0: 0/1
2016-03-29 21:46:55,731 Stage-0_0: 151(+1)/152 Stage-1_0: 0/1
2016-03-29 21:46:58,750 Stage-0_0: 151(+1)/152 Stage-1_0: 0/1
2016-03-29 21:47:01,769 Stage-0_0: 151(+1)/152 Stage-1_0: 0/1
2016-03-29 21:47:02,776 Stage-0_0: 152/152 Finished Stage-1_0: 0(+1)/1
2016-03-29 21:47:05,793 Stage-0_0: 152/152 Finished Stage-1_0: 1/1 Finished
Status: Finished successfully in 70.33 seconds
OK
57688987
Time taken: 75.211 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_back;
Query ID = hadoop_20160329214723_9663eaf7-7014-46b1-b2ca-811ba64fc55c
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Spark Job = f2dbcd55-b23c-4eb3-9439-8f1c825fbac3
Query Hive on Spark job[1] stages:
2
3
Status: Running (Hive on Spark job[1])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:47:24,449 Stage-2_0: 0(+122)/152 Stage-3_0: 0/1
2016-03-29 21:47:25,455 Stage-2_0: 96(+56)/152 Stage-3_0: 0/1
2016-03-29 21:47:26,462 Stage-2_0: 123(+29)/152 Stage-3_0: 0/1
2016-03-29 21:47:27,469 Stage-2_0: 128(+24)/152 Stage-3_0: 0/1
2016-03-29 21:47:28,476 Stage-2_0: 132(+20)/152 Stage-3_0: 0/1
2016-03-29 21:47:29,483 Stage-2_0: 137(+15)/152 Stage-3_0: 0/1
2016-03-29 21:47:30,489 Stage-2_0: 145(+7)/152 Stage-3_0: 0/1
2016-03-29 21:47:31,495 Stage-2_0: 146(+6)/152 Stage-3_0: 0/1
2016-03-29 21:47:32,500 Stage-2_0: 150(+2)/152 Stage-3_0: 0/1
2016-03-29 21:47:33,506 Stage-2_0: 152/152 Finished Stage-3_0: 0(+1)/1
2016-03-29 21:47:36,524 Stage-2_0: 152/152 Finished Stage-3_0: 0(+1)/1
2016-03-29 21:47:39,540 Stage-2_0: 152/152 Finished Stage-3_0: 0(+1)/1
2016-03-29 21:47:42,557 Stage-2_0: 152/152 Finished Stage-3_0: 0(+1)/1
2016-03-29 21:47:45,573 Stage-2_0: 152/152 Finished Stage-3_0: 0(+1)/1
2016-03-29 21:47:48,589 Stage-2_0: 152/152 Finished Stage-3_0: 0(+1)/1
2016-03-29 21:47:49,594 Stage-2_0: 152/152 Finished Stage-3_0: 1/1 Finished
Status: Finished successfully in 26.15 seconds
OK
57688987
Time taken: 26.392 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_parquet;
Query ID = hadoop_20160329214758_25084e25-fdaf-4ef8-9c1a-2573515caca6
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Spark Job = 4360be5c-4188-49c4-a2a7-e5bb80164646
Query Hive on Spark job[2] stages:
5
4
Status: Running (Hive on Spark job[2])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:47:59,472 Stage-4_0: 0(+63)/65 Stage-5_0: 0/1
2016-03-29 21:48:00,478 Stage-4_0: 1(+62)/65 Stage-5_0: 0/1
2016-03-29 21:48:01,486 Stage-4_0: 49(+14)/65 Stage-5_0: 0/1
2016-03-29 21:48:02,492 Stage-4_0: 51(+14)/65 Stage-5_0: 0/1
2016-03-29 21:48:03,498 Stage-4_0: 57(+8)/65 Stage-5_0: 0/1
2016-03-29 21:48:04,505 Stage-4_0: 62(+3)/65 Stage-5_0: 0/1
2016-03-29 21:48:05,511 Stage-4_0: 63(+2)/65 Stage-5_0: 0/1
2016-03-29 21:48:06,518 Stage-4_0: 65/65 Finished Stage-5_0: 0(+1)/1
2016-03-29 21:48:09,537 Stage-4_0: 65/65 Finished Stage-5_0: 0(+1)/1
2016-03-29 21:48:12,556 Stage-4_0: 65/65 Finished Stage-5_0: 0(+1)/1
2016-03-29 21:48:15,574 Stage-4_0: 65/65 Finished Stage-5_0: 0(+1)/1
2016-03-29 21:48:18,592 Stage-4_0: 65/65 Finished Stage-5_0: 0(+1)/1
2016-03-29 21:48:21,608 Stage-4_0: 65/65 Finished Stage-5_0: 1/1 Finished
Status: Finished successfully in 23.14 seconds
OK
57688987
Time taken: 23.376 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_parquet;
Query ID = hadoop_20160329214826_173311b1-0083-4e11-9a29-fe13f48bb649
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Spark Job = c452b02b-c68f-4c68-bc28-cb9748d7dcb2
Query Hive on Spark job[3] stages:
6
7
Status: Running (Hive on Spark job[3])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:48:27,332 Stage-6_0: 3(+60)/65 Stage-7_0: 0/1
2016-03-29 21:48:28,338 Stage-6_0: 53(+10)/65 Stage-7_0: 0/1
2016-03-29 21:48:29,343 Stage-6_0: 60(+3)/65 Stage-7_0: 0/1
2016-03-29 21:48:30,349 Stage-6_0: 61(+4)/65 Stage-7_0: 0/1
2016-03-29 21:48:31,354 Stage-6_0: 63(+2)/65 Stage-7_0: 0/1
2016-03-29 21:48:32,360 Stage-6_0: 65/65 Finished Stage-7_0: 0(+1)/1
2016-03-29 21:48:35,377 Stage-6_0: 65/65 Finished Stage-7_0: 0(+1)/1
2016-03-29 21:48:38,393 Stage-6_0: 65/65 Finished Stage-7_0: 0(+1)/1
2016-03-29 21:48:40,404 Stage-6_0: 65/65 Finished Stage-7_0: 1/1 Finished
Status: Finished successfully in 14.08 seconds
OK
57688987
Time taken: 14.306 seconds, Fetched: 1 row(s)
[hadoop@hadoop-master2 spark-1.6.0-bin-2.6.3]$ bin/spark-sql --master yarn-client --hiveconf hive.execution.engine=mr
> select count(*) from t_ods_access_log2_parquet;
57688987
16/03/29 22:19:51 INFO CliDriver: Time taken: 21.82 seconds, Fetched 1 row(s)
> select count(*) from t_ods_access_log2_back;
57688987
16/03/29 22:20:44 INFO CliDriver: Time taken: 6.634 seconds, Fetched 1 row(s)
–END
Related
Related posts
-
基于对象存储的 Spark 数据读写实战:从末尾追加到任意更新
2025-10-28
-
科学上网(续)
2018-06-09
-
使用Sphinx生成/管理文档
2017-11-16
-
windows run ubuntu
2017-10-29