Using the HiveCatalog
and Flink’s connector to Hive, Flink can read and write from Hive data as an alternative to Hive’s batch engine.
Be sure to follow the instructions to include the correct dependencies in your application.
And please also note that Hive connector only works with blink planner.
Assume Hive contains a single table in its default
database, named people that contains several rows.
With the data ready your can connect to Hive connect to an existing Hive installation and begin querying.
If you need to query Hive views, please note:
tableEnv.useCatalog(...)
in Table API or USE CATALOG ...
in SQL Client.Similarly, data can be written into hive using an INSERT
clause.
Consider there is an example table named “mytable” with two columns: name and age, in string and int type.
We support partitioned table too, Consider there is a partitioned table named myparttable with four columns: name, age, my_type and my_date, in types …… my_type and my_date are the partition keys.
We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet.
Flink uses partition pruning as a performance optimization to limits the number of files and partitions that Flink reads when querying Hive tables. When your data is partitioned, Flink only reads a subset of the partitions in a Hive table when a query matches certain filter criteria.
Flink leverages projection pushdown to minimize data transfer between Flink and Hive tables by omitting unnecessary fields from table scans.
It is especially beneficial when a table contains many columns.
For queries with LIMIT clause, Flink will limit the number of output records wherever possible to minimize the amount of data transferred across network.
Optimization is used automatically when the following conditions are met:
This feature is turned on by default. If there is a problem, you can use this config option to close Vectorized Optimization:
By default, Flink infers the hive source parallelism based on the number of splits, and the number of splits is based on the number of files and the number of blocks in the files.
Flink allows you to flexibly configure the policy of parallelism inference. You can configure the
following parameters in TableConfig
(note that these parameters affect all sources of the job):
Key | Default | Type | Description |
---|---|---|---|
table.exec.hive.infer-source-parallelism |
true | Boolean | If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config. |
table.exec.hive.infer-source-parallelism.max |
1000 | Integer | Sets max infer parallelism for source operator. |
We are planning and actively working on supporting features like
Please reach out to the community for more feature request https://flink.apache.org/community.html#mailing-lists