I'm trying to code a pipeline that uploads attachments to salesforce. I have a Taskflow function that generates the mappings to be used by the SalesforceBulkOperator. At first I thought that simply passing the returned variables would be enough. It isn't. When I try to run it it throws TypeError: object of type 'PlainXComArg' has no len() because, apparently, Airflow doesn't resolve the XComArg before passing it to the operator.
Here is the code:
mappings = generate_mappings(argument) #@task decorated function
bulk_insert = SalesforceBulkOperator(
salesforce_conn_id='salesforce_conn_id',
task_id='bulk_insert',
operation='insert',
object_name='Attachment',
payload=mappings,
external_id_field='Id',
batch_size=10000,
use_serial=False,
)
mappings >> bulk_insert
According to all the documentation I have seen (examples: https://www.astronomer.io/docs/learn/airflow-decorators/#taskflow-to-traditional-operator and https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#adding-dependencies-between-decorated-and-traditional-tasks) this should work. It doesn't.
I know that I can make this work if I move the operator into a Taskflow function, but if I do that I'm forced to execute it manually calling .execute(context) on the operator, and I find that to be a slightly less clean.
What am I doing wrong? Am I misinterpreting the documentation? Does this only work with some operators?
Note: I have to be explicit with the dependencies otherwise Airflow doesn't understand that it needs to wait for generate_mappings to finish executing before calling the operator. Perhaps that's a clue.
EDIT: as requested, here is all the relevant code:
@task
def generate_mappings(tuples):
mappings = []
for tuple in tuples:
mapping = {
'id': None,
'ParentID': tuple[0],
'Name': attachment_name,
'ContentType': 'application/pdf',
'Body': tuple[1],
}
mappings.append(mapping)
return mappings
@dag(
schedule=None,
start_date=pendulum.datetime(1970, 1, 1, tz='UTC'),
catchup=False,
)
def dag():
mappings = generate_mappings(tuples)
bulk_insert = SalesforceBulkOperator(
salesforce_conn_id='salesforce_conn_id',
task_id='bulk_insert',
operation='insert',
object_name='Attachment',
payload=mappings,
external_id_field='Id',
batch_size=10000,
use_serial=False,
)
>> mappings >> bulk_insert
The solution turned out to be quite simple: you have to call the operator using partial() first then expand(), like this:
SalesforceBulkOperator.partial(
salesforce_conn_id='salesforce_conn_id',
task_id='bulk_insert',
operation='insert',
object_name='Attachment',
external_id_field='Id',
batch_size=10000,
use_serial=False,
).expand_kwargs(payload)