i've been working on my data engineering stuff using apache_beam sdk for python. I used the 2.24 version. I have some issue with a custom coder class i created when upgrading the apache_beam version to 2.31. The custom coder class name is IgnoreUnicode. So, this is my pipeline code:
branchessap_data = (p | 'ReadData branchessap' >> beam.io.ReadFromText(branchessap, skip_header_lines =1, coder=IgnoreUnicode())
| 'SplitData branchessap' >> beam.Map(lambda x: x.split('|'))
| 'FormatToDict branchessap' >> beam.Map(lambda x: {"branch_id": x[0], "branch_sap": x[1], "branch_name": x[2], "branch_profile": x[3]})
| 'ChangeDataType branchessap' >> beam.Map(convert_types_branchessap)
| 'DELETE UNWANTED DATA BRANCHESSAP' >> beam.Map(del_unwanted_cols_branchessap)
)
And this is the IgnoreUnicode class i use to override the default coder from apache_beam:
# CLASS CHANGE FRENCH CHARACTERS
class IgnoreUnicode(Coder):
def encode(self, value):
return value.encode('utf-8','ignore')
def decode(self, value):
return value.decode('utf-8','ignore')
def is_deterministic(self):
return True
These code works fine with the apache_beam version 2.24. However, if i upgrade it to the version above 2.24, it gives me an error like this (in this case i was using version 2.31):
is there any alternative solution how to implement custom coder in version above of 2.24?
It looks like this is an unfortunate combination of the way sources were restructured and having your PCoder defined in __main__
. I suggest one of either two workarounds:
(1) Move the definition of IgnoreUnicode
to a proper module that gets imported rather than __main__
, or
(2) Read the file with BytesCoder, and follow that with a
`beam.Map(lambda line: line.decode('utf-8','ignore'))`.
(Personally, I prefer the latter, as it's preferable not to have coders that mutate the data.)