hadoophivesqoophortonworks-data-platformhadoop-partitioning

Sqoop import : composite primary key and textual primary key


Stack : Installed HDP-2.3.2.0-2950 using Ambari 2.1

The source DB schema is on sql server and it contains several tables which either have primary key as :

As per the Sqoop documentation :

Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

The first question is : What is expected by 'manually choose a splitting column' - how can I sacrifice the pk and just use one column or am I missing some concept ?

The SQL Server table is(two columns only and they form a composite primary key) :

ChassiNo    varchar(8)  Unchecked
ECU_Name    nvarchar(15)    Unchecked

I proceeded with the import, the source table has 7909097 records :

sqoop import --connect 'jdbc:sqlserver://somedbserver;database=somedb' --username someusname --password somepass --as-textfile --fields-terminated-by '|&|'  --table ChassiECU --num-mappers 8  --warehouse-dir /dataload/tohdfs/reio/odpdw/may2016 --verbose

The worrisome warnings and the incorrect mapper inputs and records :

16/05/13 10:59:04 WARN manager.CatalogQueryManager: The table ChassiECU contains a multi-column primary key. Sqoop will default to the column ChassiNo only for this job.
16/05/13 10:59:08 WARN db.TextSplitter: Generating splits for a textual index column.
16/05/13 10:59:08 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
16/05/13 10:59:08 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
16/05/13 10:59:38 INFO mapreduce.Job: Counters: 30
        File System Counters
                FILE: Number of bytes read=0
                FILE: Number of bytes written=1168400
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1128
                HDFS: Number of bytes written=209961941
                HDFS: Number of read operations=32
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=16
        Job Counters
                Launched map tasks=8
                Other local map tasks=8
                Total time spent by all maps in occupied slots (ms)=62785
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=62785
                Total vcore-seconds taken by all map tasks=62785
                Total megabyte-seconds taken by all map tasks=128583680
        Map-Reduce Framework
                Map input records=15818167
                Map output records=15818167
                Input split bytes=1128
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=780
                CPU time spent (ms)=45280
                Physical memory (bytes) snapshot=2219433984
                Virtual memory (bytes) snapshot=20014182400
                Total committed heap usage (bytes)=9394716672
        File Input Format Counters
                Bytes Read=0
        File Output Format Counters
                Bytes Written=209961941
16/05/13 10:59:38 INFO mapreduce.ImportJobBase: Transferred 200.2353 MB in 32.6994 seconds (6.1235 MB/sec)
16/05/13 10:59:38 INFO mapreduce.ImportJobBase: Retrieved 15818167 records.

Created table :

CREATE EXTERNAL TABLE IF NOT EXISTS ChassiECU(`ChassiNo` varchar(8),
`ECU_Name` varchar(15)) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'  LOCATION '/dataload/tohdfs/reio/odpdw/may2016/ChassiECU';

Awful result(without errors) --PROBLEM : 15818167 vs 7909097(sql server) records:

 > select count(1) from ChassiECU;
Query ID = hive_20160513110313_8e294d83-78aa-4e52-b90f-b5640268b8ac
Total jobs = 1
Launching Job 1 out of 1
Tez session was closed. Reopening...
Session re-established.
Status: Running (Executing on YARN cluster with App id application_1446726117927_0059)
--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     14         14        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 6.12 s
--------------------------------------------------------------------------------
OK
_c0
15818167

Surprisingly, I got either accurate or a mismatch of less than 10 records if the composite key consisted of an int(which was used for splitting) but I am still apprehensive about those as well !

How shall I proceed ?


Solution

  • Specify split column manually. Split column is not necessarily equal to PK. You can have complex PK and some int Split column. You can specify any integer column or even simple function (some simple function like substring or cast, not aggregation or analytic). Split column preferably should be evenly distributed integer.

    For example if your split column contains few rows with value -1 and 10M rows with values 10000 - 10000000 and num-mappers=8 then sqoop will split the dataset between mappers not evenly:

    that will result in data skewing and 8th mapper will run for ever or even fail. And I have also got duplicates when used non-integer split-column with MS-SQL. So, use integer split-column. In your case with table with only two varchar columns you can either

    (1) add surrogate int PK and use it also as a split or

    (2) split your data manually using custom query with WHERE clause and run sqoop few times with num-mappers=1, or

    (3) apply some deterministic Integer non-aggregation function to you varchar column, for example cast(substr(...) as int) or second(timestamp_col) or datepart(second, date), etc. as split-column. For Teradata you can use AMP number: HASHAMP (HASHBUCKET (HASHROW (string_column_list))) to get integer AMP number from list of not integer keys and rely on TD distribution between AMPs. I used simple functions directly as split-by without adding it to the query as a derived column