I am trying find if there is any files in the remote server match the provided pattern. Something as similar to the below solution Airflow File Sensor for sensing files on my local drive
I used SSHOperator with bash command as below,
SSH_Bash = """
echo 'poking for files...'
ls /home/files/test.txt
if [ $? -eq "0" ]; then
echo 'Found file'
else
echo 'failed to find'
fi
"""
t1 = SSHOperator(
ssh_conn_id='ssh_default',
task_id='test_ssh_operator',
command=SSH_Bash,
dag=dag)
It works but doesnt look like an optimal solution. Could someone help me to get better solution than Bash script to sense the files in the remote server.
I tried the below sftp sensor ,
import os
import re
import logging
from paramiko import SFTP_NO_SUCH_FILE
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.operators.sensors import BaseSensorOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class SFTPSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath,filepattern, sftp_conn_id='sftp_default', *args, **kwargs):
super(SFTPSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
self.hook = SFTPHook(sftp_conn_id)
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
try:
directory = os.listdir(self.hook.full_path)
for files in directory:
if not re.match(file_pattern, files):
self.log.info(files)
self.log.info(file_pattern)
else:
context["task_instance"].xcom_push("file_name", files)
return True
return False
except IOError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise e
return False
class SFTPSensorPlugin(AirflowPlugin):
name = "sftp_sensor"
sensors = [SFTPSensor]
But this always poke into local machine instead of remote machine. Could someone help me where i am making a mistake.
I replaced the line from
directory = os.listdir(self.hook.full_path)
to
directory = self.hook.list_directory(full_path)