sql-serverapache-nifivarbinaryzipf

NIFI - upload binary.zip to SQL Server as varbinary


I am trying to upload a binary.zip to SQL Server as varbinary type column content.

Target Table:

CREATE TABLE myTable ( zipFile varbinary(MAX) );

My NIFI Flow is very simple:

  -> GetFile: 
         filter:binary.zip

  -> UpdateAttribute:<br>
         sql.args.1.type  = -3    # as varbinary  according to JDBC types enumeration
         sql.args.1.value =  ???  # I don't know what to put here ! (I've triying everything!)
         sql.args.1.format=  ???  # Is It required? I triyed 'hex'

   -> PutSQL:<br>
         SQLstatement= INSERT INTO myTable (zip_file) VALUES (?);

What should I put in sql.args.1.value?

I think it should be the flowfile payload, but it would work as part of the INSERT in the PutSQL? Not by the moment!

Thanks!

SOLUTION UPDATE:

Based on https://issues.apache.org/jira/browse/NIFI-8052 (Consider I'm sending some data as attribute parameter)

import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def flowFile = session.get()
def lookup = context.controllerServiceLookup
def dbServiceName = flowFile.getAttribute('DatabaseConnectionPoolName')  
def tableName = flowFile.getAttribute('table_name')
def fieldName = flowFile.getAttribute('field_name')

def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find
{ cs -> lookup.getControllerServiceName(cs) == dbServiceName }

def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
def sql = new Sql(conn)


flowFile.read{ rawIn->
    def parms = [rawIn ]
    sql.executeInsert "INSERT INTO " + tableName + " (date, "+ fieldName + ")  VALUES (CAST( GETDATE() AS Date ) , ?) ", parms
}
conn?.close()

if(!flowFile) return
session.transfer(flowFile, REL_SUCCESS)
session.commit()

Solution

  • maybe there is a nifi native way to insert blob however you could use ExecuteGroovyScript instead of UpdateAttribute and PutSQL

    add SQL.mydb parameter on the level of processor and link it to required DBCP pool.

    use following script body:

    def ff=session.get()
    if(!ff)return
    
    def statement = "INSERT INTO myTable (zip_file) VALUES (:p_zip_file)"
    def params = [
      p_zip_file: SQL.mydb.BLOB(ff.read())    //cast flow file content as BLOB sql type
    ]
    SQL.mydb.executeInsert(params, statement) //committed automatically on flow file success
    
    //transfer to success without changes
    REL_SUCCESS << ff
    

    inside the script SQL.mydb is a reference to groovy.sql.Sql oblject