airflow-schedulerairflow

Airflow Remote file sensor


I am trying find if there is any files in the remote server match the provided pattern. Something as similar to the below solution Airflow File Sensor for sensing files on my local drive

I used SSHOperator with bash command as below,

SSH_Bash = """
    echo 'poking for files...'
    ls /home/files/test.txt
    if [ $? -eq "0" ]; then    
    echo 'Found file'
    else
    echo 'failed to find'
    fi
    """

t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='test_ssh_operator',
    command=SSH_Bash,
    dag=dag)

It works but doesnt look like an optimal solution. Could someone help me to get better solution than Bash script to sense the files in the remote server.

I tried the below sftp sensor ,

import os
import re

import logging
from paramiko import SFTP_NO_SUCH_FILE
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults


class SFTPSensor(BaseSensorOperator):
    @apply_defaults
    def __init__(self, filepath,filepattern, sftp_conn_id='sftp_default', *args, **kwargs):
        super(SFTPSensor, self).__init__(*args, **kwargs)
        self.filepath = filepath
        self.filepattern = filepattern
        self.hook = SFTPHook(sftp_conn_id)

    def poke(self, context):
        full_path = self.filepath
        file_pattern = re.compile(self.filepattern)

        try:
            directory = os.listdir(self.hook.full_path)
            for files in directory:
                if not re.match(file_pattern, files):
                    self.log.info(files)
                    self.log.info(file_pattern)
                else:
                    context["task_instance"].xcom_push("file_name", files)
                    return True
            return False
        except IOError as e:
            if e.errno != SFTP_NO_SUCH_FILE:
                raise e
            return False

class SFTPSensorPlugin(AirflowPlugin):
    name = "sftp_sensor"
    sensors = [SFTPSensor]

But this always poke into local machine instead of remote machine. Could someone help me where i am making a mistake.


Solution

  • I replaced the line from

    directory = os.listdir(self.hook.full_path)
    

    to

    directory = self.hook.list_directory(full_path)