hivebigdatajythonkudustreamsets

How to get StreamSets Record Fields Type inside Jython Evaluator


I have a StreamSets pipeline, where I read from a remote SQL Server database using JDBC component as an origin and put the data into a Hive and a Kudu Data Lake.

I'm facing some issues with the type Binary Columns, as there is no Binary type support in Impala, which I use to access both Hive and Kudu.

I decided to convert the Binary type columns (Which flows in the pipeline as Byte_Array type) to String and insert it like that.

I tried to use a Field Type Converter element to convert all Byte_Array types to String, but it didn't work. So I used a Jython component to convert all arr.arr types to String. It works fine, until I got a Null value on that field, so the Jython type was None.type and I was unable to detect the Byte_Array type and unable to convert it to String. So I couldn't insert it into Kudu.

Any help how to get StreamSets Record Field Types inside Jython Evaluator? Or any suggested work around for the problem I'm facing?


Solution

  • You need to use sdcFunctions.getFieldNull() to test whether the field is NULL_BYTE_ARRAY. For example:

    import array
    
    def convert(item):
      return ':-)'
    
    def is_byte_array(record, k, v):
      # getFieldNull expect a field path, so we need to prepend the '/'
      return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY 
              or (type(v) == array.array and v.typecode == 'b'))
    
    for record in records:
      try:
        record.value = {k: convert(v) if is_byte_array(record, k, v) else v 
                        for k, v in record.value.items()}
        output.write(record)
    
      except Exception as e:
        error.write(record, str(e))
    

    enter image description here