pythonairflowsalesforceairflow-taskflowairflow-xcom

PlainXComArg not resolving before being passed to Operator


I'm trying to code a pipeline that uploads attachments to salesforce. I have a Taskflow function that generates the mappings to be used by the SalesforceBulkOperator. At first I thought that simply passing the returned variables would be enough. It isn't. When I try to run it it throws TypeError: object of type 'PlainXComArg' has no len() because, apparently, Airflow doesn't resolve the XComArg before passing it to the operator.

Here is the code:

mappings = generate_mappings(argument) #@task decorated function

bulk_insert = SalesforceBulkOperator(
    salesforce_conn_id='salesforce_conn_id',
    task_id='bulk_insert',
    operation='insert',
    object_name='Attachment',
    payload=mappings,
    external_id_field='Id',
    batch_size=10000,
    use_serial=False,
)
mappings >> bulk_insert

According to all the documentation I have seen (examples: https://www.astronomer.io/docs/learn/airflow-decorators/#taskflow-to-traditional-operator and https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html#adding-dependencies-between-decorated-and-traditional-tasks) this should work. It doesn't.

I know that I can make this work if I move the operator into a Taskflow function, but if I do that I'm forced to execute it manually calling .execute(context) on the operator, and I find that to be a slightly less clean.

What am I doing wrong? Am I misinterpreting the documentation? Does this only work with some operators?

Note: I have to be explicit with the dependencies otherwise Airflow doesn't understand that it needs to wait for generate_mappings to finish executing before calling the operator. Perhaps that's a clue.

EDIT: as requested, here is all the relevant code:

@task
def generate_mappings(tuples):
mappings = []
for tuple in tuples:
    mapping = {
        'id': None,
        'ParentID': tuple[0],
        'Name': attachment_name,
        'ContentType': 'application/pdf',
        'Body': tuple[1],
    }
    mappings.append(mapping)

return mappings

@dag(
  schedule=None,
  start_date=pendulum.datetime(1970, 1, 1, tz='UTC'),
  catchup=False,
)
def dag():
  mappings = generate_mappings(tuples)

  bulk_insert = SalesforceBulkOperator(
      salesforce_conn_id='salesforce_conn_id',
      task_id='bulk_insert',
      operation='insert',
      object_name='Attachment',
      payload=mappings,
      external_id_field='Id',
      batch_size=10000,
      use_serial=False,
  )

  >> mappings >> bulk_insert

Solution

  • The solution turned out to be quite simple: you have to call the operator using partial() first then expand(), like this:

    SalesforceBulkOperator.partial(
          salesforce_conn_id='salesforce_conn_id',
          task_id='bulk_insert',
          operation='insert',
          object_name='Attachment',
          external_id_field='Id',
          batch_size=10000,
          use_serial=False,
      ).expand_kwargs(payload)