I am importing data from MySQL table (for selected columns only) and putting it in HDFS. Once this is done, I want to create a table in Hive.
For this I have a schema.sql
file which contains the CREATE TABLE statement for the entire table and I want to generate the new CREATE TABLE statement only for the columns I imported.
Something similar to what I am doing with grep
in the below example.
I used FetchFile
along with ExtractText
but couldn't make it work. How can I achieve this using NiFi processors or even Expression Language if I get the overall schema into an attribute?
Or is there a better way to create table on the imported data?
NiFi can generate Create table statement[s] based on the flowfile content
1.Creating ORC tables by using ConvertAvroToORC processor:
if you are converting the avro data into ORC format then storing into HDFS then ConvertAvroToORC processor adds hive.ddl
attribute to the flowfile.
PutHDFS processor adds absolute.hdfs.path
attribute to the flowfile.
We can use this hive.ddl, absolute.hdfs.path attributes and create the orc table on top of HDFS directory dynamically.
Flow:
Pull data from source(ExecuteSQL...etc)
-> ConvertAvroToORC //add Hive DbName,TableName in HiveTableName property value-->
-> PutHDFS //store the orc file into HDFS location -->
-> ReplaceText //Replace the flowfile content with ${hive.ddl} Location '${absolute.hdfs.path}'-->
-> PutHiveQL //execute the create table statement
Refer to this link for more details regrads to the above flow.
2.Creating Avro tables by using ExtractAvroMetaData processor:
In NiFi once we pull data by using QueryDatabaseTable,ExecuteSQL processors the format of the data is in AVRO.
We can create Avro tables based on avro schema(.avsc file) and by using ExtractAvroMetaData processor we can extract the schema and keep as flowfile attribute then by using this schema we can create AvroTables dynamically.
Flow:
ExecuteSQL (success)|-> PutHDFS //store data into HDFS
(success)|-> ExtractAvroMetadata //configure Metadata Keys as avro.schema
-> ReplaceText //replace flowfile content with avro.schema
-> PutHDFS //store the avsc file into schema directory
-> ReplaceText //create avro table on top of schema directory
-> PutHiveQL //execute the hive.ddl
Example AVRO create table statement:
CREATE TABLE as_avro
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED as INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES (
'avro.schema.url'='/path/to/the/schema/test_serializer.avsc');
We are going to change path to the schema url by using ReplaceText processor in the above flow.
Another way using ExecuteSQL processor get all the create table statements (or) columns info from (sys.tables/INFORMATION_SCHEMA.COLUMNS ..etc) from source (if source system permits) and write a script to map the data types
into hive appropriate types
then store them in your desired format
in Hive.
EDIT:
To run grep
command on the flowfile content we need to use ExecuteStreamCommand processor
ESC Configs:
Then feed the output stream
relation to ExtractText Processor
ET Configs:
Add new property as
content
(?s)(.*)
Then content attribute
is added to the flowfile, You can use that attribute and prepare create table statements.