Winse Blog

走走停停都是风景, 熙熙攘攘都向最好, 忙忙碌碌都为明朝, 何畏之.

Parquet学习

经典文章

概念

  • Row Group
    • Column Chunk
      • Page
        • Definition Levels: To support nested records we need to store the level for which the field is null. This is what the definition level is for: from 0 at the root of the schema up to the maximum level for this column. When a field is defined then all its parents are defined too, but when it is null we need to record the level at which it started being null to be able to reconstruct the record.
        • Repetition Levels: To support repeated fields we need to store when new lists are starting in a column of values. This is what repetition level is for: it is the level at which we have to create a new list for the current value. In other words, the repetition level can be seen as a marker of when to start a new list and at which level.
  • FileMetaData

The definition and repetition levels are optional, based on the schema definition. If the column is not nested (i.e. the path to the column has length 1), we do not encode the repetition levels (it would always have the value 1). For data that is required, the definition levels are skipped (if encoded, it will always have the value of the max definition level).

For example, in the case where the column is non-nested and required, the data in the page is only the encoded values.

An optimized read setup would be: 1GB row groups, 1GB HDFS block size, 1 HDFS block per HDFS file.

texfile转parquet

1
2
3
[hadoop@hadoop-master2 ~]$ cd apache-hive-1.2.1-bin/
[hadoop@hadoop-master2 apache-hive-1.2.1-bin]$ bin/hive
hive> CREATE TABLE `t_ods_access_log2_parquet`(   `houseid` string,    `sourceip` string,    `destinationip` string,    `sourceport` string,    `destinationport` string,    `domain` string,    `url` string,    `accesstime` string,    `logid` string,    `sourceipnum` bigint,    `timedetected` string,    `protocol` string,    `duration` string) ROW FORMAT DELIMITED    FIELDS TERMINATED BY '|'  STORED AS PARQUET LOCATION   '/user/hive/t_ods_access_log2_parquet'

关键 STORED AS PARQUET

关于压缩,可以通过mapreduce参数设置( mapreduce.output.fileoutputformat.compressmapreduce.output.fileoutputformat.compress.codec ),但是推荐使用 parquet.compression 属性来指定。

reader/writer都会从 CodecConfig.getCodec() 获取压缩编码。代码中会从parquet属性和mapreduce获取压缩参数。

1
2
3
4
alter table t_ods_access_log2_parquet SET TBLPROPERTIES ('parquet.compression' = 'SNAPPY' );

create table t_ods_access_log2_parquet_none like t_ods_access_log2_parquet TBLPROPERTIES ('parquet.compression' = 'UNCOMPRESSED' );
create table t_ods_access_log2_parquet_gzip like t_ods_access_log2_parquet TBLPROPERTIES ('parquet.compression' = 'GZIP' );

直接使用hive的insert into语句就可以把原来的textfile的文件转成parquet格式。同时也转成gzip和uncompress比较了一下:

文件格式 压缩 大小
textfile snappy 4.1G
parquet snappy 3.6G
parquet uncompress 7.2G
parquet gzip 2.2G

直接count整个数据表,使用parquet的输入1M不到数据,太环保了!!(文件都是几十M的,一个文件(block)可以都在一台机器上)。

文件格式 运行引擎 大小
textfile tez HDFS_BYTES_READ 4,454,071,542
parquet tez HDFS_BYTES_READ 415,870
textfile sparksql Input 4.1 GB
parquet sparksql Input 384.9 KB

用sparksql跑textfile竟然更快。果然内存大暴力也很牛啊!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
hive> insert into t_ods_access_log2_back select houseid,  sourceip,  destinationip,  sourceport,  destinationport,  domain,  url,  accesstime,  logid,  sourceipnum,  timedetected,  protocol,  duration from t_ods_access_log2 where hour=2016032804 ;
Query ID = hadoop_20160329200414_96f1de35-48c5-4b38-977f-05de8554f388
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1458893800770_3955)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED    152        152        0        0       1       0
--------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 341.56 s   
--------------------------------------------------------------------------------
Loading data to table default.t_ods_access_log2_back
Table default.t_ods_access_log2_back stats: [numFiles=152, numRows=57688987, totalSize=4454071542, rawDataSize=11018516544]
OK
Time taken: 347.997 seconds
hive> insert into t_ods_access_log2_parquet select houseid,  sourceip,  destinationip,  sourceport,  destinationport,  domain,  url,  accesstime,  logid,  sourceipnum,  timedetected,  protocol,  duration from t_ods_access_log2 where hour=2016032804 ;
Query ID = hadoop_20160329212157_57b66595-5dfc-4fc9-9ad1-398e2b8ade6b
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.


Status: Running (Executing on YARN cluster with App id application_1458893800770_3992)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED    152        152        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 237.28 s   
--------------------------------------------------------------------------------
Loading data to table default.t_ods_access_log2_parquet
Table default.t_ods_access_log2_parquet stats: [numFiles=0, numRows=1305035789, totalSize=0, rawDataSize=16965465257]
OK
Time taken: 260.515 seconds
hive> select count(*) from t_ods_access_log2_back;
Query ID = hadoop_20160329212644_da8e7997-5bcc-41ab-8b63-f1a5919c5a2f
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1458893800770_3992)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED    107        107        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 59.01 s    
--------------------------------------------------------------------------------
OK
57688987
Time taken: 59.768 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_parquet;
Query ID = hadoop_20160329212813_2fb8dafa-5c9a-40e8-a904-13e7cf865ec6
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1458893800770_3992)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED    106        106        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 45.82 s    
--------------------------------------------------------------------------------
OK
57688987
Time taken: 47.275 seconds, Fetched: 1 row(s)

hive> set hive.execution.engine=spark;
hive> set spark.master=yarn-client;
hive> select count(*) from t_ods_access_log2_back;
Query ID = hadoop_20160329214550_a58d1056-9c91-4bbe-be7d-122ec3efdd8d
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = 3a03d432-83a4-4d5a-a878-c9e52aa94bed

Query Hive on Spark job[0] stages:
0
1

Status: Running (Hive on Spark job[0])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:46:26,523 Stage-0_0: 0(+114)/152  Stage-1_0: 0/1
2016-03-29 21:46:27,535 Stage-0_0: 0(+115)/152  Stage-1_0: 0/1
2016-03-29 21:46:30,563 Stage-0_0: 0(+115)/152  Stage-1_0: 0/1
2016-03-29 21:46:33,582 Stage-0_0: 0(+115)/152  Stage-1_0: 0/1
2016-03-29 21:46:36,606 Stage-0_0: 0(+115)/152  Stage-1_0: 0/1
2016-03-29 21:46:39,624 Stage-0_0: 0(+115)/152  Stage-1_0: 0/1
2016-03-29 21:46:41,637 Stage-0_0: 0(+118)/152  Stage-1_0: 0/1
2016-03-29 21:46:42,644 Stage-0_0: 4(+115)/152  Stage-1_0: 0/1
2016-03-29 21:46:43,651 Stage-0_0: 110(+41)/152 Stage-1_0: 0/1
2016-03-29 21:46:44,658 Stage-0_0: 124(+28)/152 Stage-1_0: 0/1
2016-03-29 21:46:45,665 Stage-0_0: 128(+24)/152 Stage-1_0: 0/1
2016-03-29 21:46:46,671 Stage-0_0: 138(+14)/152 Stage-1_0: 0/1
2016-03-29 21:46:47,677 Stage-0_0: 142(+10)/152 Stage-1_0: 0/1
2016-03-29 21:46:48,684 Stage-0_0: 144(+8)/152  Stage-1_0: 0/1
2016-03-29 21:46:49,691 Stage-0_0: 147(+5)/152  Stage-1_0: 0/1
2016-03-29 21:46:50,698 Stage-0_0: 148(+4)/152  Stage-1_0: 0/1
2016-03-29 21:46:51,705 Stage-0_0: 149(+3)/152  Stage-1_0: 0/1
2016-03-29 21:46:52,712 Stage-0_0: 150(+2)/152  Stage-1_0: 0/1
2016-03-29 21:46:55,731 Stage-0_0: 151(+1)/152  Stage-1_0: 0/1
2016-03-29 21:46:58,750 Stage-0_0: 151(+1)/152  Stage-1_0: 0/1
2016-03-29 21:47:01,769 Stage-0_0: 151(+1)/152  Stage-1_0: 0/1
2016-03-29 21:47:02,776 Stage-0_0: 152/152 Finished     Stage-1_0: 0(+1)/1
2016-03-29 21:47:05,793 Stage-0_0: 152/152 Finished     Stage-1_0: 1/1 Finished
Status: Finished successfully in 70.33 seconds
OK
57688987
Time taken: 75.211 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_back;
Query ID = hadoop_20160329214723_9663eaf7-7014-46b1-b2ca-811ba64fc55c
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = f2dbcd55-b23c-4eb3-9439-8f1c825fbac3

Query Hive on Spark job[1] stages:
2
3

Status: Running (Hive on Spark job[1])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:47:24,449 Stage-2_0: 0(+122)/152  Stage-3_0: 0/1
2016-03-29 21:47:25,455 Stage-2_0: 96(+56)/152  Stage-3_0: 0/1
2016-03-29 21:47:26,462 Stage-2_0: 123(+29)/152 Stage-3_0: 0/1
2016-03-29 21:47:27,469 Stage-2_0: 128(+24)/152 Stage-3_0: 0/1
2016-03-29 21:47:28,476 Stage-2_0: 132(+20)/152 Stage-3_0: 0/1
2016-03-29 21:47:29,483 Stage-2_0: 137(+15)/152 Stage-3_0: 0/1
2016-03-29 21:47:30,489 Stage-2_0: 145(+7)/152  Stage-3_0: 0/1
2016-03-29 21:47:31,495 Stage-2_0: 146(+6)/152  Stage-3_0: 0/1
2016-03-29 21:47:32,500 Stage-2_0: 150(+2)/152  Stage-3_0: 0/1
2016-03-29 21:47:33,506 Stage-2_0: 152/152 Finished     Stage-3_0: 0(+1)/1
2016-03-29 21:47:36,524 Stage-2_0: 152/152 Finished     Stage-3_0: 0(+1)/1
2016-03-29 21:47:39,540 Stage-2_0: 152/152 Finished     Stage-3_0: 0(+1)/1
2016-03-29 21:47:42,557 Stage-2_0: 152/152 Finished     Stage-3_0: 0(+1)/1
2016-03-29 21:47:45,573 Stage-2_0: 152/152 Finished     Stage-3_0: 0(+1)/1
2016-03-29 21:47:48,589 Stage-2_0: 152/152 Finished     Stage-3_0: 0(+1)/1
2016-03-29 21:47:49,594 Stage-2_0: 152/152 Finished     Stage-3_0: 1/1 Finished
Status: Finished successfully in 26.15 seconds
OK
57688987
Time taken: 26.392 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_parquet;
Query ID = hadoop_20160329214758_25084e25-fdaf-4ef8-9c1a-2573515caca6
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = 4360be5c-4188-49c4-a2a7-e5bb80164646

Query Hive on Spark job[2] stages:
5
4

Status: Running (Hive on Spark job[2])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:47:59,472 Stage-4_0: 0(+63)/65    Stage-5_0: 0/1
2016-03-29 21:48:00,478 Stage-4_0: 1(+62)/65    Stage-5_0: 0/1
2016-03-29 21:48:01,486 Stage-4_0: 49(+14)/65   Stage-5_0: 0/1
2016-03-29 21:48:02,492 Stage-4_0: 51(+14)/65   Stage-5_0: 0/1
2016-03-29 21:48:03,498 Stage-4_0: 57(+8)/65    Stage-5_0: 0/1
2016-03-29 21:48:04,505 Stage-4_0: 62(+3)/65    Stage-5_0: 0/1
2016-03-29 21:48:05,511 Stage-4_0: 63(+2)/65    Stage-5_0: 0/1
2016-03-29 21:48:06,518 Stage-4_0: 65/65 Finished       Stage-5_0: 0(+1)/1
2016-03-29 21:48:09,537 Stage-4_0: 65/65 Finished       Stage-5_0: 0(+1)/1
2016-03-29 21:48:12,556 Stage-4_0: 65/65 Finished       Stage-5_0: 0(+1)/1
2016-03-29 21:48:15,574 Stage-4_0: 65/65 Finished       Stage-5_0: 0(+1)/1
2016-03-29 21:48:18,592 Stage-4_0: 65/65 Finished       Stage-5_0: 0(+1)/1
2016-03-29 21:48:21,608 Stage-4_0: 65/65 Finished       Stage-5_0: 1/1 Finished
Status: Finished successfully in 23.14 seconds
OK
57688987
Time taken: 23.376 seconds, Fetched: 1 row(s)
hive> select count(*) from t_ods_access_log2_parquet;
Query ID = hadoop_20160329214826_173311b1-0083-4e11-9a29-fe13f48bb649
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Spark Job = c452b02b-c68f-4c68-bc28-cb9748d7dcb2

Query Hive on Spark job[3] stages:
6
7

Status: Running (Hive on Spark job[3])
Job Progress Format
CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]
2016-03-29 21:48:27,332 Stage-6_0: 3(+60)/65    Stage-7_0: 0/1
2016-03-29 21:48:28,338 Stage-6_0: 53(+10)/65   Stage-7_0: 0/1
2016-03-29 21:48:29,343 Stage-6_0: 60(+3)/65    Stage-7_0: 0/1
2016-03-29 21:48:30,349 Stage-6_0: 61(+4)/65    Stage-7_0: 0/1
2016-03-29 21:48:31,354 Stage-6_0: 63(+2)/65    Stage-7_0: 0/1
2016-03-29 21:48:32,360 Stage-6_0: 65/65 Finished       Stage-7_0: 0(+1)/1
2016-03-29 21:48:35,377 Stage-6_0: 65/65 Finished       Stage-7_0: 0(+1)/1
2016-03-29 21:48:38,393 Stage-6_0: 65/65 Finished       Stage-7_0: 0(+1)/1
2016-03-29 21:48:40,404 Stage-6_0: 65/65 Finished       Stage-7_0: 1/1 Finished
Status: Finished successfully in 14.08 seconds
OK
57688987
Time taken: 14.306 seconds, Fetched: 1 row(s)


[hadoop@hadoop-master2 spark-1.6.0-bin-2.6.3]$ bin/spark-sql --master yarn-client --hiveconf hive.execution.engine=mr 
         > select count(*) from t_ods_access_log2_parquet;
57688987
16/03/29 22:19:51 INFO CliDriver: Time taken: 21.82 seconds, Fetched 1 row(s)

         > select count(*) from t_ods_access_log2_back;
57688987
16/03/29 22:20:44 INFO CliDriver: Time taken: 6.634 seconds, Fetched 1 row(s)

–END

Comments