airflow

S3KeySensor to monitor files across multiple S3 buckets


wait_for_success_file = S3KeySensor(
    task_id='wait_for_success_marker',
    bucket_key=[
        f's3://bucket-123/test-output/_SUCCESS*',  
        f's3://bucket-456/test-output/_SUCCESS'
    ],
    wildcard_match=True,
    check_fn=lambda keys: len(keys) > 0,  # checks if any file matched
    poke_interval=120,  # 2 minutes 
    timeout=600,  # 10 minutes
    mode='reschedule',
    dag=dag
)

Problem

Question

Can anyone with airflow experience help guide me as to what could be causing this problem and any recommendations to solve my above problem with some good practices. Thanks


Solution

  • To have more control on your tasks logic you could use a single sensor per bucket:

    wait_bucket_123 = S3KeySensor(
        task_id='wait_bucket_123',
        bucket_key='test-output/_SUCCESS*',
        bucket_name='bucket-123',
        wildcard_match=True,
        poke_interval=120,
        timeout=600,
        mode='reschedule',
        dag=dag
    )
    
    wait_bucket_456 = S3KeySensor(
        task_id='wait_bucket_456',
        bucket_key='test-output/_SUCCESS',
        bucket_name='bucket-456',
        poke_interval=120,
        timeout=600,
        mode='reschedule',
        dag=dag
    )
    
    # Success if either exists
    from airflow.operators.dummy import DummyOperator
    join = DummyOperator(task_id='success_join', dag=dag)
    
    [wait_bucket_123, wait_bucket_456] >> join
    

    or write your own PythonSensors or write the check_fn= of the S3KeySensor (Function that receives the list of the S3 objects with the context values, and returns a boolean: - True: the criteria is met - False: the criteria isn’t met Example: Wait for any S3 object size more than 1 megabyte)