Dynamic partitioning in Hive

I am writing this post how to work with dynamic partition tables along with bucketing. Also we will see how to store data in ORC format.

As we all know there is huge amount of data (terabytes/petabytes/exabytes/zettabytes) is stored in Hadoop HDFS so, it becomes very difficult for Hadoop users to query this huge amount of data. Apache Hive was introduced to lower down this burden of data querying. Hive converts the SQL queries into MapReduce jobs and then submits it to the Hadoop cluster.

Partitioning is the optimization technique in Hive which improves the performance significantly. Apache hive is the data warehouse on the top of Hadoop, which enables adhoc analysis over structured and semi-structured data.

Hive Partitioning


Types of Hive Partitioning

  • Static Partition (SP) columns: in DML/DDL involving multiple partitioning columns, the columns whose values are known at COMPILE TIME (given by user).
  • Dynamic Partition (DP) columns: columns whose values are only known at EXECUTION TIME.

Dynamic-partition insert (or multi-partition insert) is designed to solve this problem by dynamically determining which partitions should be created and populated while scanning the input table. This is a newly added feature that is only available from version 0.6.0. In the dynamic partition insert, the input column values are evaluated to determine which partition this row should be inserted into. If that partition has not been created, it will create that partition automatically. Using this feature you need only one insert statement to create and populate all necessary partitions. In addition, since there is only one insert statement, there is only one corresponding MapReduce job. This significantly improves performance and reduce the Hadoop cluster workload comparing to the multiple insert case.

Unlike the logical Partitioning in RDBMS, partitioning in hive is physical, for each partition different directory is created. Now while querying if partitioned column is specified then only the data of specified partition will be processed. If the table contains 10 Billion records, for each adhoc requirement all 10 Billion records need to be processed. On the other hand if the table is partitioned (as shown in figure), if report over specific day’s data need to be created, the data of that day (specified partition containing data in the range of few thousands) need to be processed.

The ORC File (Optimized Row Columnar) format provides a more efficient way to store relational data than the RC File, reducing the data storage format by up to 75% of the original. The ORC file format performs better than other Hive files formats when Hive is reading, writing, and processing data. Specifically compared to the RC File, ORC takes less time to access data and takes less space to store data. However, the ORC file increases CPU overhead by increasing the time it takes to decompress the relational data. Also, the ORC File format comes with the Hive 0.11 version and cannot be used with previous versions.
ORCFile

Some useful tips before we start working on dynamic partitioned tables
⇛ It is always advisable, first create staging table and load raw data into it
⇛ Staging table is also helpfull in Dynamic Partitioning for loading data into partitioned table
⇛ From the staging table load data into master table
⇛ This way you can always verify data validity and load into master tables what is required and essential
⇛ This is also helpful in auditing the data

Step 1: Set hadoop environment variables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- Log into database
use twitter_db;

-- Set Dynamic partioning enabled and mode to nonstrict
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;

set hive.mapred.mode=nonstrict;

-- Set the maximum number of reducers to the same number of buckets specified
-- in the table metadata (i.e. 31). See below create MASTER table with buckets
set map.reduce.tasks=31;

-- Use the following command to enforce bucketing
set hive.enforce.bucketing=true;

Step 2: Create staging table to load raw data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DROP TABLE IF EXISTS TWITTER_TBL_STG PURGE;

-- Create staging table for twitter data
CREATE TABLE IF NOT EXISTS TWITTER_TBL_STG (
       ID BIGINT,
       SCREEN_NAME STRING,
       TWEET_DATE STRING,
       SOURCE STRING,
       TWEET_COMMENT STRING,
       FRIENDS_COUNT BIGINT,
       FOLLOWERS_COUNT BIGINT,
       TWEET_LANG STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LOCATION '/user/hue/hive_tbls/tweets/twitter_tbl_stg';

Step 3: Load raw data into staging table

1
2
LOAD DATA INPATH '/user/hue/input_data/tweets/'
OVERWRITE INTO TABLE TWITTER_TBL_STG;

Step 4: Create Master table if it does not exists in Hive database
1. Provide partitioned columns
2. Provide bucketing columns
3. Provide storage format as ORC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE EXTERNAL TABLE IF NOT EXISTS TWITTER_TBL (
       ID BIGINT,
       SCREEN_NAME STRING,
       TWEET_DATE TIMESTAMP,
       DAY INT,
       SOURCE STRING,
       TWEET_COMMENT STRING,
       FRIENDS_COUNT BIGINT,
       FOLLOWERS_COUNT BIGINT)
PARTITIONED BY (TWEET_LANG STRING, YEAR INT, MONTH INT)
CLUSTERED BY (DAY) INTO 31 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS ORC
LOCATION '/user/hue/hive_tbls/tweets/twitter_tbl'
TBLPROPERTIES ("orc.compress.size"="1024");

Step 5: Load data into Master table from staging table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
FROM   TWITTER_TBL_STG A
INSERT INTO TABLE TWITTER_TBL  PARTITION(TWEET_LANG, YEAR, MONTH)
SELECT A.ID,
       A.SCREEN_NAME,
       FROM_UNIXTIME( UNIX_TIMESTAMP(A.TWEET_DATE) ) tweet_date,
       DAY( FROM_UNIXTIME( UNIX_TIMESTAMP(A.TWEET_DATE) ) ) tweet_day,
       A.SOURCE,
       A.TWEET_COMMENT,
       A.FRIENDS_COUNT,
       A.FOLLOWERS_COUNT,
       A.TWEET_LANG,
       YEAR( FROM_UNIXTIME( UNIX_TIMESTAMP(A.TWEET_DATE) ) ) tweet_year,
       MONTH( FROM_UNIXTIME( UNIX_TIMESTAMP(A.TWEET_DATE) ) ) tweet_month
DISTRIBUTE BY A.TWEET_LANG,
              YEAR( FROM_UNIXTIME( UNIX_TIMESTAMP(A.TWEET_DATE) ) ),
              MONTH( FROM_UNIXTIME( UNIX_TIMESTAMP(A.TWEET_DATE) ) );

For more on ORC format please look here.

Finally,
Hive Partitioning – Advantages and Disadvantages

  1. Advantage
    • Partitioning in Hive distributes execution load horizontally.
    • Execution of query is faster with low volume of data. For example, search population from City:Hyderabad returns very fast instead of searching entire data in the table.
  2. Disadvantage
    • There is the possibility of too many small partitions (sub-directories) created.
    • Partition is effective for low volume data. But there are some queries like Group By on high volume of data take longer time to execute. For example, grouping population of Country:India will take a long time as compared to a grouping of the population in City:Hyderabad.