apache-nifihortonworks-data-platformhortonworks-dataflow

Extracting multiline content from flow file content


I am importing data from MySQL table (for selected columns only) and putting it in HDFS. Once this is done, I want to create a table in Hive.

For this I have a schema.sql file which contains the CREATE TABLE statement for the entire table and I want to generate the new CREATE TABLE statement only for the columns I imported.

Something similar to what I am doing with grep in the below example.

enter image description here

I used FetchFile along with ExtractText but couldn't make it work. How can I achieve this using NiFi processors or even Expression Language if I get the overall schema into an attribute?

Or is there a better way to create table on the imported data?


Solution

  • NiFi can generate Create table statement[s] based on the flowfile content

    1.Creating ORC tables by using ConvertAvroToORC processor:

    Flow:

     Pull data from source(ExecuteSQL...etc)
      -> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value--> 
      -> PutHDFS //store the orc file into HDFS location --> 
      -> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'--> 
      -> PutHiveQL //execute the create table statement
    

    Refer to this link for more details regrads to the above flow.

    2.Creating Avro tables by using ExtractAvroMetaData processor:

    Flow:

    ExecuteSQL (success)|-> PutHDFS //store data into HDFS
               (success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema 
                         -> ReplaceText //replace flowfile content with avro.schema
                         -> PutHDFS //store the avsc file into schema directory
                         -> ReplaceText //create avro table on top of schema directory
                         -> PutHiveQL //execute the hive.ddl
    

    Example AVRO create table statement:

    CREATE TABLE as_avro
      ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
      STORED as INPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
      OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
      TBLPROPERTIES (
        'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');
    

    We are going to change path to the schema url by using ReplaceText processor in the above flow.

    Another way using ExecuteSQL processor get all the create table statements (or) columns info from (sys.tables/INFORMATION_SCHEMA.COLUMNS ..etc) from source (if source system permits) and write a script to map the data types into hive appropriate types then store them in your desired format in Hive.

    EDIT:

    To run grep command on the flowfile content we need to use ExecuteStreamCommand processor

    ESC Configs:

    enter image description here

    Then feed the output stream relation to ExtractText Processor

    ET Configs:

    Add new property as

    content

    (?s)(.*)
    

    enter image description here

    Then content attribute is added to the flowfile, You can use that attribute and prepare create table statements.