I have a StreamSets pipeline, where I read from a remote SQL Server database using JDBC component as an origin and put the data into a Hive and a Kudu Data Lake.
I'm facing some issues with the type Binary Columns, as there is no Binary type support in Impala, which I use to access both Hive and Kudu.
I decided to convert the Binary type columns (Which flows in the pipeline as Byte_Array type) to String and insert it like that.
I tried to use a Field Type Converter element to convert all Byte_Array types to String, but it didn't work. So I used a Jython component to convert all arr.arr types to String. It works fine, until I got a Null value on that field, so the Jython type was None.type and I was unable to detect the Byte_Array type and unable to convert it to String. So I couldn't insert it into Kudu.
Any help how to get StreamSets Record Field Types inside Jython Evaluator? Or any suggested work around for the problem I'm facing?
You need to use sdcFunctions.getFieldNull()
to test whether the field is NULL_BYTE_ARRAY
. For example:
import array
def convert(item):
return ':-)'
def is_byte_array(record, k, v):
# getFieldNull expect a field path, so we need to prepend the '/'
return (sdcFunctions.getFieldNull(record, '/'+k) == NULL_BYTE_ARRAY
or (type(v) == array.array and v.typecode == 'b'))
for record in records:
try:
record.value = {k: convert(v) if is_byte_array(record, k, v) else v
for k, v in record.value.items()}
output.write(record)
except Exception as e:
error.write(record, str(e))