Sqoop Incremental Import | MySQL to Hive

Sqoop automates most of this process, relying on the database to describe the schema for the data to be imported. Sqoop uses MapReduce to import and export the data, which provides parallel operation as well as fault tolerance.

Here we will see how to create Sqoop Incremental Import process
We will be working with MySQL database and import data into Hive.
Data imported is in text format. Text is not only more time consuming to parse and more storage heavy file format, but it’s also vulnerable to many issues caused by dirty data. For example, the occurrence of the new line character (\n) or field delimiter (e.g. \u001 or,) in any individual column of your imported rows might cause problems later when reading them in Hive. Fortunately, you can overcome this problem by using the --hive-delims-replacement option to replace these delimiters with your own string when importing table, or even drop them by using the --hive-drop-import-delims option. Although it helps you avoid parsing errors, the dataset imported to Hive will slightly differ from the original one in MySQL. Changing the delimiters also impacts performance, because Sqoop will have to parse the MySQL dump‘s output into fields and transcode them into the user-specified delimiter instead of just copying the data directly from MySQL dump‘s output into HDFS.
Incremental imports mode can be used to retrieve only rows newer than previously imported set of rows.

Why and When to use lastmodified mode?
lastmodified works on time-stamped data
⇛ Use this when rows of the source table may be updated
⇛ And each such update will set the value of a last-modified column to the current timestamp
⇛ Rows where the check column holds a timestamp more recent than the timestamp specified with --last-value are imported

Note:―
⇛ Oracle Timestamp format : ‘DD-Mon-YY HH24:MI:SS.FF
⇛ MySQL Timestamp format : ‘YYYY-MM-DD HH:MI:SS
⇛ Sqoop Timestamp format : ‘YYYY-MM-DD HH24:MI:SS.FF
     Specify --last-value in Double Quotes. ie, timestamp in this case.
⇛ Make sure that many RDBMS are case sensitive (database, table, columns).

When to use append mode?
⇛ Works for numerical data that is incrementing over time, such as auto-increment keys
⇛ When importing a table where new rows are continually being added with increasing row id values

Arguments which control incremental imports:―
--check-column <col> Specifies the column to be examined when determining which rows to import.
⇛ The column SHOULD NOT be of type (Char/NChar/VarChar/VarNChar/LongVarChar/LongNVarChar)
--incremental <mode> Specifies how Sqoop determines which rows are new.
⇛ Legal values for mode include append and lastmodified.
⇛ Append mode when importing a table where new rows are continually being added with increasing row id values
⇛ Specify the column containing the row’s id with --check-column
⇛ Sqoop imports rows where the check column has a value greater than the one specified with --last-value
--last-value <value> Specifies the maximum value of the check column from the previous import

Table description in MySQL

mysql> use VenkatDb;
Database changed

mysql> show tables;
+--------------------+
| Tables_in_VenkatDb |
+--------------------+
| TWITTER_TBL        |
| TWITTER_TBL_STAGE  |
+--------------------+
2 rows in set (0.00 sec)

mysql> desc TWITTER_TBL;
+-----------------+---------------+------+-----+-------------------+-----------------------------+
| Field           | Type          | Null | Key | Default           | Extra                       |
+-----------------+---------------+------+-----+-------------------+-----------------------------+
| ID              | bigint(20)    | NO   | PRI | NULL              |                             |
| SCREEN_NAME     | varchar(50)   | YES  |     | NULL              |                             |
| TWEET_DATE      | timestamp     | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
| SOURCE          | varchar(30)   | YES  |     | NULL              |                             |
| TWEET_COMMENT   | varchar(2000) | YES  |     | NULL              |                             |
| FRIENDS_COUNT   | bigint(20)    | YES  |     | NULL              |                             |
| FOLLOWERS_COUNT | bigint(20)    | YES  |     | NULL              |                             |
| TWEET_LANG      | char(2)       | YES  |     | NULL              |                             |
+-----------------+---------------+------+-----+-------------------+-----------------------------+
8 rows in set (0.00 sec)

mysql> select * from TWITTER_TBL limit 10;

sqoop2

Incremental load is achieved in 2 steps –
Step 1 – Initial load/import
Step 2 – Continuous regular load/import after initial import

Step 1 – Initial import of data from MySQL to Hive

    • When going with Managed Tables in Hive
      For initial import, table can be created directly in Hive (Managed Tables) by specifying parameters --create-hive-table, --hive-import and --hive-table <TABLE_NAME_IN_HADOOP_SYSTEM>. Data is stored at Hive Warehouse location. See below command for initial load.
    # Initial load using Managed Tables.
    sqoop import \
          --connect jdbc:mysql://database.example.com/myFinDB \
          --username SOMEUSER -P -m 1 \
          --table TWITTER_TBL \		# Importing table name in MySQL
          --hive-database venkat_hc_db \
          --hive-table TWITTER_TBL_HIVE \	# Table name in HDFS
          --where "TWEET_DATE <= '2015-12-20'" \
          --create-hive-table \		# Creates table in HDFS in Hive Warehouse Dir
          --fields-terminated-by '\t' \
          --hive-import;			# This parameter is REQUIRED to import data into Hive Warehouse Dir
    

    This command will connect to a MySQL database named myFinDB on the host database.example.com. It’s important that you do not use the URL localhost if you intend to use Sqoop with a distributed Hadoop cluster. The connect string you supply will be used on TaskTracker nodes throughout your MapReduce cluster; if you specify the literal name localhost, each node will connect to a different database (or more likely, no database at all). Instead, you should use the full hostname or IP address of the database host that can be seen by all your remote nodes.
     

    • When going with External Tables in Hive
      Sqoop is not supporting HIVE external tables at the moment. Any imports using scoop creates a managed table, in real world scenario it is very important to have EXTERNAL tables. As of now we have to execute ALTER statement to change table properties to make the table as external table which is not a big deal but it would nice have an option in scoop to specify type of table which is required.

      Existing Hive table can’t be truncated before the import
      The --delete-target-dir option is not respected. This means that if you run your import more than once, you risk duplication of the dataset.

      There are 2 options to go with External Tables for initial load –
      Option 1 – Create Managed table using Sqoop command said in point#1 above. Then alter managed table using Alter command and make it external table and also change the location.
      Option 2 – Create external table in Hive before running sqoop import command (i.e. table MUST exist in Hive).
      Note:―
       1.  It is advisable to create staging table and import data into it.
         a.  This is helpful to compare data with master table. This way you can avoid duplicate data import.
         b.  This is also helpful to load data into partitioned tables.
       2.  Before creating table, you must take care of field DATATYPES to avoid any loss of data or discrepancies. Most of the issues mainly occurs with Date, DateTime or TimeStamp fields. It is advisable to use “string” datatype for date-time fields. See Hive-QL command for creating Hive External table.

      -- Create EXTERNAL Table using Hive Shell
      CREATE EXTERNAL TABLE IF NOT EXISTS hcatFinDB.TWITTER_TBL_HIVE (
             ID BIGINT,
             SCREEN_NAME STRING,
             TWEET_DATE STRING,
             SOURCE STRING,
             TWEET_COMMENT STRING,
             FRIENDS_COUNT BIGINT,
             FOLLOWERS_COUNT BIGINT,
             TWEET_LANG STRING)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      LOCATION '/user/hue/HiveExtTbls/twitter_tbl_hive';
      

      Use below Sqoop command to import data from MySQL into Hive external table. To load data into Hive for External tables, you SHOULD not use --hive-import as this cause conflict of target directory --target-dir with respect to Hive Warehouse Directory (Managed Table) and ultimately causes Semantic Exception error. See highlighted parameters, though throws warning message as we have NOT used parameter --hive-import but loads data to specified target directory --target-dir.

      sqoop import \
            --connect jdbc:mysql://192.168.88.129/myFinDB \
            --username SOMEUSER -P -m 1 \
            --table TWITTER_TBL \		# Table name in MySQL
            --hive-database hcatFinDB \
            --hive-table TWITTER_TBL_HIVE \
            --where "TWEET_DATE <= '2015-12-20'" \
            --target-dir hdfs://192.168.88.129:8020/user/hue/HiveExtTbls/twitter_tbl_hive \
            --create-hive-table \
            --fields-terminated-by '\t';
      

      A snapshot of the warning message is provided below for better understanding.sqoop6

Step 2 – Incremental import of data from MySQL to Hive (after initial load is done)
From 2nd load onwards you just need to import data as HDFS load ONLY.

    Use below SQOOP command for incremental loads.

    sqoop import \
          --connect jdbc:mysql://192.168.88.129/myFinDB \
          --username SOMEUSER -P -m 1 \
          --table TWITTER_TBL \		# Table name in MySQL
          --append \
          --hive-database hcatFinDB \
          --hive-table TWITTER_TBL_HIVE \		# Existing table name in Hive
          --where "TWEET_DATE <= '2015-12-22'" \
          --check-column TWEET_DATE \
          --last-value 2015-12-21 \
          --incremental lastmodified \
          --fields-terminated-by '\t' \
          --target-dir hdfs://sandbox.hortonworks.com:8020/user/hue/HiveExtTbls/twitter_tbl_hive;
    

    Create a SQOOP JOB for the above command in-order to use it for regular imports as shown below.

    # Creating Sqoop Job
    sqoop job --create incrementalImportJob -- import \
          --connect jdbc:mysql://192.168.88.129/myFinDB \
          --username SOMEUSER -P -m 1 \
          --table TWITTER_TBL \		# Table name in MySQL
          --append \
          --check-column TWEET_DATE \
          --last-value 2015-12-21 \
          --incremental lastmodified \
          --fields-terminated-by '\t' \
          --target-dir hdfs://sandbox.hortonworks.com:8020/user/hue/HiveExtTbls/twitter_tbl_hive;
    

    Note:―
    ℹ In the above sqoop job command I have removed warning parameters as we DO NOT require them any more for regular scheduled imports --hive-database hcatFinDB and --hive-table TWITTER_TBL_HIVE.

    ℹ Last Value from the previous import acts as the argument for --last-value
    ℹ Sqoop checks for changes in data between the last value timestamp (Lower bound value) and Current timestamp (Upper bound value) and imports the modified or newly added rows.

Sqoop job commands
# To list existing Sqoop Jobs
sqoop job --list

# To show details of Sqoop Job
sqoop job --show incrementalImportJob

# To Execute Sqoop Job
sqoop job --exec incrementalImportJob

# To drop a Sqoop Job
sqoop job --delete incrementalImportJob

sqoop9

25 thoughts on “Sqoop Incremental Import | MySQL to Hive”

  1. MySQL Table :
    +——+———-+——+——–+———————+
    | id | name | age | salary | last_modified |
    +——+———-+——+——–+———————+
    | 1 | Dhiren | 23 | 80000 | 2018-08-22 22:34:56 |
    | 2 | Hiral | 22 | 60000 | 2018-08-22 22:36:39 |
    | 3 | Shital | 31 | 31000 | 2018-08-23 10:31:38 |
    | 4 | Raj | 30 | 20000 | 2018-08-22 22:37:08 |
    | 5 | Rajendra | 27 | 27000 | 2018-08-23 10:32:07 |
    | 6 | Ankita | 25 | 25000 | 2018-08-23 11:12:02 |
    | 7 | Arijit | 10 | 10000 | 2018-08-23 16:31:34 |
    | 8 | Atif | 11 | 11000 | 2018-08-23 16:31:47 |
    +——+———-+——+——–+———————+
    MySql Table Schema-
    id int(50), name varchar(50),age int(50),salary int(50),last_modified timestamp

    Sqoop Query MySql To Hive Import:

    sqoop import –connect jdbc:mysql://localhost/sqoop –username root -P –table customer –warehouse-dir /user/hive/warehouse/sqoop.db/ –fields-terminated-by “\t” –append –incremental lastmodified –check-column last_modified –last-value ‘2018-08-22 22:37:08’ –m 1

    Hive Output:

    hive> select * from customer;
    OK
    1 Dhiren 23 80000 2018-08-22 22:34:56.0
    2 Hiral 22 60000 2018-08-22 22:36:39.0
    3 Shital 31 31000 2018-08-23 10:31:38.0
    4 Raj 30 20000 2018-08-22 22:37:08.0
    5 Rajendra 27 27000 2018-08-23 10:32:07.0
    6 Ankita 24 19000 2018-08-22 22:38:32.0
    7 Arijit 34 120000 2018-08-22 20:22:25.0
    8 Atif 33 117000 2018-08-23 00:06:32.0
    NULL NULL NULL NULL NULL
    NULL NULL NULL NULL NULL
    NULL NULL NULL NULL NULL

    Hive Table Schema-
    id int
    name string
    age int
    salary int
    last_modified string

    Why it is showing NULL fields ?
    Please give a soluiton.

  2. Hi,
    When I run incremental last modified option , I getting duplicate records, full row duplicates, its not due to updates in rcords, there is not Updates, what may be the issue?

    1. Hi Ravi,

      Can you pls share your RDBMS key fields and the modified key used in your hiveQL. That way I could help you.

      Regards,
      Venkat

  3. sqoop import –connect “jdbc:oracle:thin:@hostname:1521:dbname” –password “****” –username “uname” –table ACCOUNT –hive-import –hive-table default.ACCOUNT –check-column MODIFY_DTS –incremental append –last-value ‘2016-12-01’ –merge-key ACCOUNT_NUMBER -m 1

    I am trying to do an incremental import and insert/update the data based on the key_field ACCOUNT_NUMBER on to the Hive table account. Instead of getting updated the records are getting inserted. Can you please comment on it

    1. Hi Manoj,

      As of Hive 0.14 and lesser versions, you can only insert data into the table. If you want to update data then you need to go with Partitioning and then remove old data and add new updated data.

      Thanks,
      Venkat

  4. Hi Venkat,

    Well Explained ..its really useful, but i have few doubts

    1) importing directly from sqoop into hive using –hive import without mentioning target directory in command ,does that managed table ??

    2) importing directly from sqoop into hive using –hive import with mentioned target dir as –user/$USERNAME/sqoop ,does it cause any problem? will it create managed table ??

    3)when loading data into external table from sqoop for second time ,if we dont mention the –delete target dir parameter ,will it not cause any filealreadyexists exception ??

    Thanks for your time

    1. Hi Aditya,

      First thing, Sqoop imort with hivedoes not support managed external tables. Sqoop always tries to create tables default hive metadata store. So, initially you create tables in hive metadata store and then alter the tables to managed external tables. This way table remains in the database. From 2nd time onwards you just need to mention target table name and data is inserted into it.

      Let me know if you still have questions.

      Thanks,
      Venkat

  5. I have datatype of text in mysql here it is Replied_Message, I am getting junk data in hive. How to avoid this

    /home/dwhhadoop/sqoop/bin/sqoop-import –connect jdbc:mysql://172.16.0.53:3306/mdbhistory -username user -password pass –table messagesentinfo_feb23 -columns MsgId,MatriId,Replied_Message –hive-table messagesentinfo_feb23 –hive-import –hive-home /home/dwhhadoop/hive –direct –num-mappers 4

  6. Hi Venkat,
    a quick question, how it will handle it the modified data as in this case we will have previous as well as modified data. How hive query will reflect the updated/modified data?

  7. Hi Venkat S, Thanks very much for the Impressively very well explained post. I have a question, my requirement is to achieve the same using ORC format, which is Import the data from source, save it as ORC and do an incremental Import to append to ORC. Thanks in advance.

  8. After I set the table location with
    LOCATION ‘/user/hue/HiveExtTbls/twitter_tbl_hive’;
    I got an error org.apache.hadoop.mapred.FileAlreadyExistsException:
    when executing sqoop job with
    –target-dir hdfs://192.168.88.129:8020/user/hue/HiveExtTbls/twitter_tbl_hive

    1. Hi Kate – To load data into Hive for External tables, you SHOULD not use –hive-import as this cause conflict of target directory –target-dir with respect to Hive Warehouse Directory (Managed Table) and ultimately causes Semantic Exception error.

  9. This post is very helpful.

    Quick follow-up question – If the table structure is altered(datatype changes, columns are added/deleted, etc..) at the source (MySQL database in this case) do we need to manually change the structure at the destination too or is the changes seemlessly reflected in Hive? The idea is to reduce the administrative overheads and reduce the impact to end-users especially where the are multiple sources.

    1. When column datatypes are changed or existing columns are dropped then you need to take care of those changes. This is similar to what we do with our regular traditional Databases table changes between Dev and Prod instances. Coming to adding new columns in source, that will not have any impact unless you need them to be exported to hive.

  10. Nice post,Well Explained.But if i need to take the last-value as dynamically means if mention last-value on ID,then i mention last-value 100 but next day i got extra data in mysql then how can i give that ID as a last-value dynamically.means i don’t want to assign last-value frequently .

    Thanks advance…….

    1. Hi Pratheep,

      Create ‘SQOOP JOB’ with last-value (initially first value would be 100 in your example). So once the SQOOP JOB is scheduled with the last-value as 100, sqoop will automatically pull all data ID>100 to max data available till that time of execution; lets say 10 more records 101 to 110 will be loaded into system on day one, and it internally updates last-value to 110. On day 2 run the same SQOOP JOB, this time SQOOP JOB will start looking for ID>110. You can verify this before running the job using command – ‘sqoop job –show yourSqoopJobName’ on the console.

  11. Nice post. I used to be checking constantly this weblog and I’m inspired!

    Extremely useful information specially the last part 🙂
    I maintain such info much. I was looking for this particular info for a very lengthy time.
    Thank you and best of luck.

Leave a Reply

Your email address will not be published. Required fields are marked *