0%

Impala数据文件的碎碎念

Impala目前支持Hadoop中几种常见的文件格式 ParquetORCTextAvroRCFileSequenceFile 。下面简要说明各种格式的使用、限制和一些注意事项。

不同的文件格式有着不同的适用场景,并且对性能有着重大的影响。不同的文件格式支持的压缩编码不同,不同的编码影响这IO和CPU资源的开销,较高的压缩率需要较大的CPU资源,但是数据移动成本较低。使用时需要权衡CPU时间和数据传输时间。

除了Parquet和Text(未压缩)文件格式外,其他的文件格式impala都不能进行数据的插入。有关特定文件格式,请参见下表。

File Type Format Compression Codecs Impala Can CREATE? Impala Can INSERT?
Parquet 结构化 Snappy, gzip, zstd; 默认为Snappy Yes. Yes: CREATE TABLE, INSERT, LOAD DATA, and query.
ORC 结构化 gzip, Snappy, LZO, LZ4; 默认为gzip ORC是 CDH 6.1/Impala 3.1&Impala 2.12开始新增的实验性的特性。禁用方式: set ‑‑enable_orc_scanner=false No. Impala只能进行查询,需要通过其他组件加载数据。
Text 非结构化 LZO, gzip, bzip2, Snappy Yes. 创建表时的默认格式。 未压缩的情况下可以插入,否则不可以。如果是LZO的压缩编码,那么需要通过HIVE来创建表和加载数据。其他压缩编码,加载数据需要通过其他组件完成
Avro 结构化 Snappy, gzip, deflate Yes,Impala 1.4.0开始可以创建,之前的版本需要通过hive创建 No. 需要通过其他组件进行数据加载
RCFile 结构化 Snappy, gzip, deflate, bzip2 Yes. No. 需要通过其他组件进行数据加载
SequenceFile 结构化 Snappy, gzip, deflate, bzip2 Yes. No. 需要通过其他组件进行数据加载

Impala支持以下的Compression Codecs:

Snappy: 在压缩率和解压缩速度之间比较平衡。Impala2.0及以上版本支持text, RC, Sequence 和 Avro文件格式

Gzip:拥有最高的压缩率。Impala2.0及以上版本支持text, RC, Sequence 和 Avro文件格式

Deflate: 不支持文本格式

Bzip2:Impala2.0及以上版本支持text, RC, and Sequence文件格式

LZO:只支持文本格式。Impala可以查询LZO压缩的文本表,但目前无法创建它们或将数据插入其中。需要在Hive中执行这些操作。

Zstd:只支持Parquet文件格式

不同的文件格式和Compression Codecs适用于不同的数据集。为数据选择适当的格式可以产生更好的性能:

  • 如果是已经存在的文件,那么在性能可以接受的情况下,继续使用该文件格式。如果不可接受,那么新建一个表,使用新的文件格式和compression codecs,并一次性将数据写入到新表中。
  • text格式是默认的文件格式,可以很方便的在不同工具之间传输,并且是人类可读的,所以在使用中是一个很普遍的格式。但是如果性能和磁盘空间是首要考虑因素,那么就使用其他结构化的文件格式,并采用合理的压缩编码。

Text Data Files

Text是一种比较通用的文件格式,可以很方便的与其他应用或者脚本进行数据交换,常见的如CSV和TSV等。

文本文件格式在列定义上非常的灵活。文本文件中可以包含比Impala表定义多的列,此时查询数据时会忽略文件中多出的列;包含的列也可以比Impala少,此时对于缺少的列,Impala会将其作为 NULL 值。

关于查询性能

以文本格式存储的数据相对笨重,查询性能不像二进制格式(如parquet)那样高效。使用Text文件格式比较典型的场景是收到的文件即为该文本格式,或者是需要输出该文件格式用于与其他应用的数据交换。如果需要更加高效的文件格式,可以创建一个使用二进制
为了得到更紧凑的数据,可以使用LZO压缩,基于可分割的特性,可以让不同节点,对同一文件的不同部分并发工作。

在impala 2.0或者更高版本,也可以使用gzip, bzip2, or Snappy 格式压缩,但是这些压缩格式不支持对文件进行并发操作。所以对于文本文件的压缩,LZO是首选。

Text table的创建

不加任何额外语句创建的表格式就是text文件格式的。默认使用ctrl-A作为分隔符。

1
create table my_table(id int, s string, n int, t timestamp, b boolean);

也可以通过 fields terminated by 指定分隔符

1
2
3
4
5
6
7
8
9
10
11
12
13
create table csv(id int, s string, n int, t timestamp, b boolean)
row format delimited
fields terminated by ','
stored as textfile; -- 一般情况下不需要指定,默认即为textfile

create table tsv(id int, s string, n int, t timestamp, b boolean)
row format delimited
fields terminated by '\t';

create table pipe_separated(id int, s string, n int, t timestamp, b boolean)
row format delimited
fields terminated by '|'
stored as textfile;

不要用引号来来将文本内容括起来,如果是文本中包含分隔符,使用 ESCAPE BY 子句指定的转义符进行转义。

DESCRIBE FORMATTED table_name 语句可以查看表格式的详细信息。

复杂类型的注意事项:目前Text表不支持复杂数据类型

Text Table中的数据文件

当Impala查询文本格式数据的表时,它会查阅数据目录中的所有数据文件,但有些例外:

  • 忽略点或者下划线开头的隐藏文件
  • Impala查询时会忽略Hadoop工具用于临时工作文件的扩展名的文件。如 .tmp.copying
  • 当是压缩表的时候,通过后缀识别文件是否压缩,切只扫描改部分文件。 .bz2 .snappy .gz

INSERT...SELECT 会在每一个SELECT语句处理数据的节点产生一个数据文件。 INSERT…VALUES 会为每一个语句产生一个数据文件。经常使用 INSERT...VALUES 会产生大量的小文件,不建议使用该语句插入数据。如果发现表中存在大量的小文件,可以通过 INSERT...SELECT 将数据插入到新的表中来解决该问题。

Text数据文件中的特殊值

  • 对数值类型的列 inf 表示无穷大, nan 表示非数字。
  • \N 表示 null 。使用Sqoop的时候,设置以下参数来确保产生正确的 NULL--null-string '\\N' --null-non-string '\\N'
  • 默认情况下Sqoop会使用 null 字符串代表空值。这将导致在impala中识别错误,可以通过 ALTER TABLE name SET TBLPROPERTIES("serialization.null.format"="null") 来解决这个问题。
  • 在impala 2.6以及以上版本,可以通过 skip.header.line.countTBLPROPERTIES 参数来跳过表头的指定行数。
    alter table header_line set tblproperties('skip.header.line.count'='1');

加载数据到Impala的text表

  • LOAD DATA 语句指定具体的数据文件或者一个目录,将其移动到Impala表的数据目录下。
  • 其他格式的表通过 CREATE TABLE AS SELECT 或者 INSERT INTO 的方式传输到 Text 表。
1
2
3
4
5
6
7
8
9
-- Text table with default delimiter, the hex 01 character.
CREATE TABLE text_table AS SELECT * FROM other_file_format_table;

-- Text table with user-specified delimiter. Currently, you cannot specify
-- the delimiter as part of CREATE TABLE LIKE or CREATE TABLE AS SELECT.
-- But you can change an existing text table to have a different delimiter.
CREATE TABLE csv LIKE other_file_format_table;
ALTER TABLE csv SET SERDEPROPERTIES ('serialization.format'=',', 'field.delim'=',');
INSERT INTO csv SELECT * FROM other_file_format_table;
  • 也可以使用 HDFS的 hdfs dfs -puthdfs dfs -cp 将数据文件放置到impala表目录中,并使用 REFRESH table_name 刷元数据。

Parquet Data Files

Impala允许你创建、管理和查询Parquet表。Parquet是一个面向列的二进制文件格式,并且自带Schema,支持高效率的大规模数据查询,尤其适合查询表中特定少部分列的场景。

Impala写入的每个Parquet文件都包含了一组行数据,其中每列的值都被组织在相邻的位置,从而对这些值实现了良好的压缩,实现最快和最小IO的数据查询。

创建ParquetTable

在建表时指定 STORE AS PQRQUET 子句.

1
2
3
4
create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;

-- 或者从其他表复制表结构,并指定存储文件格式
create table parquet_table_name LIKE other_table_name STORED AS PARQUET;

在Impala 1.4.0以及以上版本,可以通过parquet数据文件来推测表结构

1
2
3
4
5
6
7
8
9
10
11
CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET
LOCATION '/user/etl/destination';

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET;

-- 如果是分区表,分区信息不是parquet文件的一部分,需要建表时指定
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
PARTITION (year INT, month TINYINT, day TINYINT)
STORED AS PARQUET;

加载数据到ParquetTables

  • 可以使用 INSERT...SELECT 语句将其他表的数据写入parquet表。避免使用 INSERT...VALUE 语句向parquet表写入数据,它会为每个语句产生一个文件。在INSERT数据时,至少保证HDFS中有一个数据块(默认1GB)的空间,否则会插入失败。
  • 如果是已经存在parquet数据文件,可以使用以下方式加载数据:
    1) LOAD DATA 语句将数据文件或者目录移动到Parquet表目录中
    2) CREATE TABLE 的时候指定 LOCATION 位置为已经存在Parquet数据文件的目录。或者使用 CREATE EXTERNAL TABLE 可以保证在删除表的时候不会删除数据文件。
    3) hadoop的 hadoop distcp -pb 命令(这个命令可以保证parquet数据文件块的大小与表定义一致)将文件移动到相应的parquet表目录下,并 REFRESH 表以识别新加的数据文件

加载数据到parquet表是一个内存敏感的操作,写入的数据会缓存达到一个数据块大小的时候才会写入到磁盘。如果是分区表,会为每个分区缓存一份数据,当所有数据都到达时再写入数据文件。当向分区表写入数据时,Impala会在节点之间重新分布数据以减少内存消耗。

Parquet表的查询性能

一个Parquet文件由Header、DataBlock和Footer组成。
HEADER部分会有个Magic Number用于标识这是一个Parquet文件。
DataBlock是实际存储数据的地方,包括以下三部分:
1)行组(row group):多行数据组成,默认一个行组与HDFS的一个数据块大小对齐。实现数据的并行处理。
2)列块(Column Chunk):行组中的数据以列块的形式保存,同一列的数据相邻。实现列式存储。
3)数据页(data page):最小存储编码,每一个列块包含多个页。更细粒度的数据访问
Footer部分由File Metadata、Footer Length和Magic Number三部分组成。Footer Length是一个4字节的数据,用于标识Footer部分的大小,帮助找到Footer的起始指针位置。Magic Number同样是PAR1。File Metada包含了非常重要的信息,包括Schema和每个Row Group的Metadata。每个Row Group的Metadata又由各个Column的Metadata组成,每个Column Metadata包含了其Encoding、Offset、Statistic信息等等

1)影响查询性能的主要因素是查询语句中使用到的列的数量,parquet的列式存储,在一个具有大量列的表中,查询少量的列,可以减少数据的读取提升性能。
2)对于分区表,Impala只会读取指定分区中的数据。
3)依据Footer中的Statistic信息可以进行Predicate Filter,根据where条件,只读取指定文件中特定row group的数据。在这里分两种情况,一种是数值型的,则根据max和min值筛选。另一种是字符型,根据字典编码筛选。

Parquet表的压缩

parquet文件支持 Snappy, gzip , zstd 三种压缩格式,通过 COMPRESSION_CODEC 的查询参数控制。默认情况下是 snappy 格式的压缩, gzip 的压缩比例最大。越高的压缩比,在解压缩的时候需要耗费更多的CPU时间。

1
2
3
4
5
6
[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s

压缩之后会把压缩格式的元数据写入数据文件当中,此时查询的时候会忽略 COMPRESSION_CODEC 参数,所以,一个表中可以包含多种压缩格式的文件。

当在不同的节点或者目录复制parquet文件的时候 使用 hadoop distcp -pb 来确保文件块的大小与之前相同。

压缩parquet文件(小文件合并大文件)

哪些情况会导致很多小文件?
1、当在N个节点进行数据处理,会在每个节点产生一个数据文件。并且每个节点产生的数据很少的时候。
2、即使每个节点有大量数据,但是在此基础上进行分区,如果每个分区的文件大小过小,也会产生大量小文件

避免产生大量小文件的方法
1、插入分区表时,使用静态分区,不要使用动态分区
2、当 INSERT 或者 CREATE TABLE AS 语句,会在不同节点产生大量小文件时。使用 SET NUM_NODES=1 查询参数,来取消分布式的特性,从而产生一个或少量文件。
3、减少分区数量
4、在内存中是未被压缩的数据大小,一个在内存中大于256MB的数据,被压缩后会小于256MB,但是仍然会写入到两个数据文件之中
5、当目前有一个很多小文件的表的时候,考虑将数据写入到新的表中。

表模式的修改

ALTER TABLE ... REPLACE COLUMNS 对列进行修改:
1)该语句不会修改数据文件。只是根据新的定义来解析数据文件。某些不合理的修改会导致查询结果出现异常值或者产生错误
2) INSERT 语句将使用最近的一次表定义的结构。
3)当新增列之后,之前插入数据缺少的这列值将会被置空。
4)如果修改之后,表定义减少了一些列,在读取数据的时候会忽略之前写入的数据。
5)Parquet文件中 TINYINT SMALLINTINT 在内部都是32位整数。

ORC Data Files

对orc数据文件的读取,作为实验功能,从impala 3.1开始加入。

创建ORC表,并加载数据

1
CREATE TABLE orc_table (column_specs) STORED AS ORC;

目前impala不能向orc数据文件中写入数据,只能通过HIVE或者其他外部组件将数据加载进来,然后使用 REFRESH table_name 刷新数据。

启用压缩

1
2
3
4
5
6
7
8
9
hive> SET hive.exec.compress.output=true;
hive> SET orc.compress=SNAPPY;
hive> INSERT OVERWRITE TABLE new_table SELECT * FROM old_table;

-- 对于分区表
hive> CREATE TABLE new_table (your_cols) PARTITIONED BY (partition_cols) STORED AS new_format;
hive> SET hive.exec.dynamic.partition.mode=nonstrict;
hive> SET hive.exec.dynamic.partition=true;
hive> INSERT OVERWRITE TABLE new_table PARTITION(comma_separated_partition_cols) SELECT * FROM old_table;

关于性能

比text快,但是比parquet慢

Avro Data Files

Avro的性能比Text好,比Parquet差。

创建表

在表定义时需要声明所有的列以匹配Avro文件的Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[localhost:21000] > CREATE TABLE avro_only_sql_columns
> (
> id INT,
> bool_col BOOLEAN,
> tinyint_col TINYINT, /* Gets promoted to INT */
> smallint_col SMALLINT, /* Gets promoted to INT */
> int_col INT,
> bigint_col BIGINT,
> float_col FLOAT,
> double_col DOUBLE,
> date_string_col STRING,
> string_col STRING
> )
> STORED AS AVRO;

[localhost:21000] > CREATE TABLE impala_avro_table
> (bool_col BOOLEAN, int_col INT, long_col BIGINT, float_col FLOAT, double_col DOUBLE, string_col STRING, nullable_int INT)
> STORED AS AVRO
> TBLPROPERTIES ('avro.schema.literal'='{
> "name": "my_record",
> "type": "record",
> "fields": [
> {"name":"bool_col", "type":"boolean"},
> {"name":"int_col", "type":"int"},
> {"name":"long_col", "type":"long"},
> {"name":"float_col", "type":"float"},
> {"name":"double_col", "type":"double"},
> {"name":"string_col", "type":"string"},
> {"name": "nullable_int", "type": ["null", "int"]}]}');

[localhost:21000] > CREATE TABLE avro_examples_of_all_types (
> id INT,
> bool_col BOOLEAN,
> tinyint_col TINYINT,
> smallint_col SMALLINT,
> int_col INT,
> bigint_col BIGINT,
> float_col FLOAT,
> double_col DOUBLE,
> date_string_col STRING,
> string_col STRING
> )
> STORED AS AVRO
> TBLPROPERTIES ('avro.schema.url'='hdfs://localhost:8020/avro_schemas/alltypes.json');

avro中的空值,用 null 表示

Impala 2.3以及以上版本,Impala会在创建表的时候就检查表定义与Avro中Schema是否一致。当存在表定义,与avro文件不匹配的时候的处理方式:
1)当列数不匹配的时候,impala会使用avro中的模式定义
2)当列名称或者类型不匹配的时候,impala会使用avro中的模式定义
3)当impala中的 TIMESTAMP 映射到avro中的 STRING 时,以 STRING 表示

Avro数据文件不支持定义复杂数据类型。

Impala目前不支持向Avro中写入数据,因此数据只能从其他组件加载,然后在Impala中查询。

开启压缩

只能在Hive中操作。Impala支持 snappydeflate

1
2
hive> set hive.exec.compress.output=true;
hive> set avro.output.codec=snappy;

RCFile Data Files

impala目前不能往RCFile中写入文件。只能通过其他组件加载数据到Impala。

RCFile的性能比Text好,比Parquet差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ impala-shell -i localhost
[localhost:21000] > create table rcfile_table (x int) stored as rcfile;
[localhost:21000] > create table rcfile_clone like some_other_table stored as rcfile;
[localhost:21000] > quit;

$ hive
hive> insert into table rcfile_table select x from some_other_table;
3 Rows loaded to rcfile_table
Time taken: 19.015 seconds
hive> quit;

$ impala-shell -i localhost
[localhost:21000] > select * from rcfile_table;
Returned 0 row(s) in 0.23s
[localhost:21000] > -- Make Impala recognize the data loaded through Hive;
[localhost:21000] > refresh rcfile_table;
[localhost:21000] > select * from rcfile_table;
+---+
| x |
+---+
| 1 |
| 2 |
| 3 |
+---+
Returned 3 row(s) in 0.23s

开启压缩

只能在Hive中操作。

1
2
3
4
5
6
7
8
9
10
11
12
hive> SET hive.exec.compress.output=true;
hive> SET mapred.max.split.size=256000000;
hive> SET mapred.output.compression.type=BLOCK;
hive> SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> INSERT OVERWRITE TABLE new_table SELECT * FROM old_table;

--如果是分区表
hive> CREATE TABLE new_table (your_cols) PARTITIONED BY (partition_cols) STORED AS new_format;
hive> SET hive.exec.dynamic.partition.mode=nonstrict;
hive> SET hive.exec.dynamic.partition=true;
hive> INSERT OVERWRITE TABLE new_table PARTITION(comma_separated_partition_cols) SELECT * FROM old_table;

Sequence Data Files

RCFile的性能比Text好,比Parquet差。

1
create table sequencefile_table (column_specs) stored as sequencefile;

Impala不能向sequence文件中写入数据。使用Hive或其他方式加载数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ impala-shell -i localhost
[localhost:21000] > create table seqfile_table (x int) stored as sequencefile;
[localhost:21000] > create table seqfile_clone like some_other_table stored as sequencefile;
[localhost:21000] > quit;

$ hive
hive> insert into table seqfile_table select x from some_other_table;
3 Rows loaded to seqfile_table
Time taken: 19.047 seconds
hive> quit;

$ impala-shell -i localhost
[localhost:21000] > select * from seqfile_table;
Returned 0 row(s) in 0.23s
[localhost:21000] > -- Make Impala recognize the data loaded through Hive;
[localhost:21000] > refresh seqfile_table;
[localhost:21000] > select * from seqfile_table;
+---+
| x |
+---+
| 1 |
| 2 |
| 3 |
+---+
Returned 3 row(s) in 0.23s

开启压缩

在Hive中进行操作

1
2
3
4
5
6
7
8
9
10
11
hive> SET hive.exec.compress.output=true;
hive> SET mapred.max.split.size=256000000;
hive> SET mapred.output.compression.type=BLOCK;
hive> SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> insert overwrite table new_table select * from old_table;

-- 如果是分区表
hive> create table new_table (your_cols) partitioned by (partition_cols) stored as new_format;
hive> SET hive.exec.dynamic.partition.mode=nonstrict;
hive> SET hive.exec.dynamic.partition=true;
hive> insert overwrite table new_table partition(comma_separated_partition_cols) select * from old_table;