_SUCCESS file creation across two different locations in different S3 bucketswait_for_success_file = S3KeySensor(
task_id='wait_for_success_marker',
bucket_key=[
f's3://bucket-123/test-output/_SUCCESS*',
f's3://bucket-456/test-output/_SUCCESS'
],
wildcard_match=True,
check_fn=lambda keys: len(keys) > 0, # checks if any file matched
poke_interval=120, # 2 minutes
timeout=600, # 10 minutes
mode='reschedule',
dag=dag
)
_SUCCESS file in the first location with bucket-123 it doesn't trigger success and the task keeps pokingbucket-456 if I create a _SUCCESS file the task completes successfullyCan anyone with airflow experience help guide me as to what could be causing this problem and any recommendations to solve my above problem with some good practices. Thanks
To have more control on your tasks logic you could use a single sensor per bucket:
wait_bucket_123 = S3KeySensor(
task_id='wait_bucket_123',
bucket_key='test-output/_SUCCESS*',
bucket_name='bucket-123',
wildcard_match=True,
poke_interval=120,
timeout=600,
mode='reschedule',
dag=dag
)
wait_bucket_456 = S3KeySensor(
task_id='wait_bucket_456',
bucket_key='test-output/_SUCCESS',
bucket_name='bucket-456',
poke_interval=120,
timeout=600,
mode='reschedule',
dag=dag
)
# Success if either exists
from airflow.operators.dummy import DummyOperator
join = DummyOperator(task_id='success_join', dag=dag)
[wait_bucket_123, wait_bucket_456] >> join
or write your own PythonSensors or write the check_fn= of the S3KeySensor (Function that receives the list of the S3 objects with the context values, and returns a boolean: - True: the criteria is met - False: the criteria isn’t met Example: Wait for any S3 object size more than 1 megabyte)