I am trying to move files from my "new" folder in my-bucket to "test" folder in the same bucket using GCSToGCSOperator, I am passing a list of files to the source bucket but when I run dag I don't see files moving. I used xcom push to get list of file names from the folder new in my bucket
def segregate_files(ti):
source_bucket = "my-bucket"
PROJECT_ID="project"
destination_bucket = "my-bucket"
source_files=ti.xcom_pull(task_ids='list_file')
client = storage.Client(project=PROJECT_ID)
bucket = client.get_bucket(source_bucket)
print(source_files) #['new/abc.txt','new/bcd.txt','new/abc_bcd.txt']
new_files = [file for file in data if 'abc' in file]
print(new_files) #['new/abc.txt','new/abc_bcd.txt']
if new_files:
task_id = 'move_new_files'
move_abc_files = GCSToGCSOperator(
task_id=task_id,
source_bucket=source_bucket,
source_objects=new_files,
destination_bucket=destination_bucket,
destination_object="test/",
move_object=True,
dag=dag
)
bucket_files = GCSListObjectsOperator(
task_id='list_file',
bucket='my-bucket',
prefix='new/',
delimiter='.txt',
do_xcom_push=True,
dag=dag
)
As a workaround you can consider the below code as an example:
import os
from google.cloud import storage
source_bucket_name = 'my-bucket'
source_folder_name = 'new'
destination_bucket_name = 'my-bucket'
destination_folder_name = 'old'
def copy_files(ti):
file_list=ti.xcom_pull(task_ids='listing_files')
storage_client = storage.Client()
source_bucket = storage_client.get_bucket(source_bucket_name)
destination_bucket = storage_client.get_bucket(destination_bucket_name)
for filename in file_list:
source_blob = source_bucket.blob(f'{filename}')
output_filename=os.path.basename(filename)
destination_blob = destination_bucket.blob(f'{destination_folder_name}/{output_filename}')
destination_blob.upload_from_string(source_blob.download_as_string())
print(f'Files copied successfully.{file_list}')
with DAG(dag_id='final',start_date=datetime(2021, 4, 5, 15, 0),schedule_interval='@daily',catchup=False) as dag:
t1 = BashOperator(task_id='started', bash_command='echo starting…')
t2 = GoogleCloudStorageListOperator(task_id='listing_files',bucket='my-bucket',prefix='new/',delimiter='.txt')
t3 = PythonOperator(task_id="copying_files",python_callable=copy_files)
t4 = BashOperator(task_id='end', bash_command='echo end')
t1>>t2>>t3>>t4
DAG :
NB: I tested the above code for txt and py files. It worked fine for me.