Category Archives: Sqoop

Sqoop Incremental Import | MySQL to Hive

Sqoop automates most of this process, relying on the database to describe the schema for the data to be imported. Sqoop uses MapReduce to import and export the data, which provides parallel operation as well as fault tolerance.

Here we will see how to create Sqoop Incremental Import process
We will be working with MySQL database and import data into Hive.
Data imported is in text format. Text is not only more time consuming to parse and more storage heavy file format, but it’s also vulnerable to many issues caused by dirty data. For example, the occurrence of the new line character (\n) or field delimiter (e.g. \u001 or,) in any individual column of your imported rows might cause problems later when reading them in Hive. Fortunately, you can overcome this problem by using the --hive-delims-replacement option to replace these delimiters with your own string when importing table, or even drop them by using the --hive-drop-import-delims option. Although it helps you avoid parsing errors, the dataset imported to Hive will slightly differ from the original one in MySQL. Changing the delimiters also impacts performance, because Sqoop will have to parse the MySQL dump‘s output into fields and transcode them into the user-specified delimiter instead of just copying the data directly from MySQL dump‘s output into HDFS.
Incremental imports mode can be used to retrieve only rows newer than previously imported set of rows.

Why and When to use lastmodified mode?
lastmodified works on time-stamped data
⇛ Use this when rows of the source table may be updated
⇛ And each such update will set the value of a last-modified column to the current timestamp
⇛ Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported

⇛ Oracle Timestamp format : ‘DD-Mon-YY HH24:MI:SS.FF
⇛ MySQL Timestamp format : ‘YYYY-MM-DD HH:MI:SS
⇛ Sqoop Timestamp format : ‘YYYY-MM-DD HH24:MI:SS.FF
     Specify --last-value in Double Quotes. ie, timestamp in this case.
⇛ Make sure that many RDBMS are case sensitive (database, table, columns).

When to use append mode?
⇛ Works for numerical data that is incrementing over time, such as auto-increment keys
⇛ When importing a table where new rows are continually being added with increasing row id values

Arguments which control incremental imports:―
--check-column <col> Specifies the column to be examined when determining which rows to import.
⇛ The column SHOULD NOT be of type (Char/NChar/VarChar/VarNChar/LongVarChar/LongNVarChar)
--incremental <mode> Specifies how Sqoop determines which rows are new.
⇛ Legal values for mode include append and lastmodified.
⇛ Append mode when importing a table where new rows are continually being added with increasing row id values
⇛ Specify the column containing the row’s id with --check-column
⇛ Sqoop imports rows where the check column has a value greater than the one specified with --last-value
--last-value <value> Specifies the maximum value of the check column from the previous import

Table description in MySQL

mysql> use VenkatDb;
Database changed

mysql> show tables;
| Tables_in_VenkatDb |
| TWITTER_TBL        |
2 rows in set (0.00 sec)

mysql> desc TWITTER_TBL;
| Field           | Type          | Null | Key | Default           | Extra                       |
| ID              | bigint(20)    | NO   | PRI | NULL              |                             |
| SCREEN_NAME     | varchar(50)   | YES  |     | NULL              |                             |
| TWEET_DATE      | timestamp     | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| SOURCE          | varchar(30)   | YES  |     | NULL              |                             |
| TWEET_COMMENT   | varchar(2000) | YES  |     | NULL              |                             |
| FRIENDS_COUNT   | bigint(20)    | YES  |     | NULL              |                             |
| FOLLOWERS_COUNT | bigint(20)    | YES  |     | NULL              |                             |
| TWEET_LANG      | char(2)       | YES  |     | NULL              |                             |
8 rows in set (0.00 sec)

mysql> select * from TWITTER_TBL limit 10;


Incremental load is achieved in 2 steps –
Step 1 – Initial load/import
Step 2 – Continuous regular load/import after initial import

Step 1 – Initial import of data from MySQL to Hive

    • When going with Managed Tables in Hive
      For initial import, table can be created directly in Hive (Managed Tables) by specifying parameters --create-hive-table, --hive-import and --hive-table <TABLE_NAME_IN_HADOOP_SYSTEM>. Data is stored at Hive Warehouse location. See below command for initial load.
    # Initial load using Managed Tables.
    sqoop import \
          --connect jdbc:mysql:// \
          --username SOMEUSER -P -m 1 \
          --table TWITTER_TBL \		# Importing table name in MySQL
          --hive-database venkat_hc_db \
          --hive-table TWITTER_TBL_HIVE \	# Table name in HDFS
          --where "TWEET_DATE <= '2015-12-20'" \
          --create-hive-table \		# Creates table in HDFS in Hive Warehouse Dir
          --fields-terminated-by '\t' \
          --hive-import;			# This parameter is REQUIRED to import data into Hive Warehouse Dir

    This command will connect to a MySQL database named myFinDB on the host It’s important that you do not use the URL localhost if you intend to use Sqoop with a distributed Hadoop cluster. The connect string you supply will be used on TaskTracker nodes throughout your MapReduce cluster; if you specify the literal name localhost, each node will connect to a different database (or more likely, no database at all). Instead, you should use the full hostname or IP address of the database host that can be seen by all your remote nodes.

    • When going with External Tables in Hive
      Sqoop is not supporting HIVE external tables at the moment. Any imports using scoop creates a managed table, in real world scenario it is very important to have EXTERNAL tables. As of now we have to execute ALTER statement to change table properties to make the table as external table which is not a big deal but it would nice have an option in scoop to specify type of table which is required.

      Existing Hive table can’t be truncated before the import
      The --delete-target-dir option is not respected. This means that if you run your import more than once, you risk duplication of the dataset.

      There are 2 options to go with External Tables for initial load –
      Option 1 – Create Managed table using Sqoop command said in point#1 above. Then alter managed table using Alter command and make it external table and also change the location.
      Option 2 – Create external table in Hive before running sqoop import command (i.e. table MUST exist in Hive).
       1.  It is advisable to create staging table and import data into it.
         a.  This is helpful to compare data with master table. This way you can avoid duplicate data import.
         b.  This is also helpful to load data into partitioned tables.
       2.  Before creating table, you must take care of field DATATYPES to avoid any loss of data or discrepancies. Most of the issues mainly occurs with Date, DateTime or TimeStamp fields. It is advisable to use “string” datatype for date-time fields. See Hive-QL command for creating Hive External table.

      -- Create EXTERNAL Table using Hive Shell
             ID BIGINT,
             SCREEN_NAME STRING,
             TWEET_DATE STRING,
             SOURCE STRING,
             TWEET_LANG STRING)
      LOCATION '/user/hue/HiveExtTbls/twitter_tbl_hive';

      Use below Sqoop command to import data from MySQL into Hive external table. To load data into Hive for External tables, you SHOULD not use --hive-import as this cause conflict of target directory --target-dir with respect to Hive Warehouse Directory (Managed Table) and ultimately causes Semantic Exception error. See highlighted parameters, though throws warning message as we have NOT used parameter --hive-import but loads data to specified target directory --target-dir.

      sqoop import \
            --connect jdbc:mysql:// \
            --username SOMEUSER -P -m 1 \
            --table TWITTER_TBL \		# Table name in MySQL
            --hive-database hcatFinDB \
            --hive-table TWITTER_TBL_HIVE \
            --where "TWEET_DATE <= '2015-12-20'" \
            --target-dir hdfs:// \
            --create-hive-table \
            --fields-terminated-by '\t';

      A snapshot of the warning message is provided below for better understanding.sqoop6

Step 2 – Incremental import of data from MySQL to Hive (after initial load is done)
From 2nd load onwards you just need to import data as HDFS load ONLY.

    Use below SQOOP command for incremental loads.

    sqoop import \
          --connect jdbc:mysql:// \
          --username SOMEUSER -P -m 1 \
          --table TWITTER_TBL \		# Table name in MySQL
          --append \
          --hive-database hcatFinDB \
          --hive-table TWITTER_TBL_HIVE \		# Existing table name in Hive
          --where "TWEET_DATE <= '2015-12-22'" \
          --check-column TWEET_DATE \
          --last-value 2015-12-21 \
          --incremental lastmodified \
          --fields-terminated-by '\t' \
          --target-dir hdfs://;

    Create a SQOOP JOB for the above command in-order to use it for regular imports as shown below.

    # Creating Sqoop Job
    sqoop job --create incrementalImportJob -- import \
          --connect jdbc:mysql:// \
          --username SOMEUSER -P -m 1 \
          --table TWITTER_TBL \		# Table name in MySQL
          --append \
          --check-column TWEET_DATE \
          --last-value 2015-12-21 \
          --incremental lastmodified \
          --fields-terminated-by '\t' \
          --target-dir hdfs://;

    ℹ In the above sqoop job command I have removed warning parameters as we DO NOT require them any more for regular scheduled imports --hive-database hcatFinDB and --hive-table TWITTER_TBL_HIVE.

    ℹ Last Value from the previous import acts as the argument for --last-value
    ℹ Sqoop checks for changes in data between the last value timestamp (Lower bound value) and Current timestamp (Upper bound value) and imports the modified or newly added rows.

Sqoop job commands
# To list existing Sqoop Jobs
sqoop job --list

# To show details of Sqoop Job
sqoop job --show incrementalImportJob

# To Execute Sqoop Job
sqoop job --exec incrementalImportJob

# To drop a Sqoop Job
sqoop job --delete incrementalImportJob