amazon-s3ssisazure-storageazure-blob-storageazure-data-factory

Copy Data From Azure Blob Storage to AWS S3


I am new to Azure Data Factory and have an interesting requirement.

I need to move files from Azure Blob storage to Amazon S3, ideally using Azure Data Factory.

However S3 isnt supported as a sink;

enter image description here

https://learn.microsoft.com/en-us/azure/data-factory/copy-activity-overview

I also understand from a variety of comments i've read on here that you cannot directly copy from Blob Storage to S3 - you would need to download the file locally and then upload it to S3.

Does anyone know of any examples, in Data factory, SSIS or Azure Runbook that can do such a thing, I suppose an option would be to write an azure logic-app or function that is called from Data Factory.


Solution

  • Managed to get something working on this - it might be useful for someone else.

    I decided to write an azure function that uses a HTTP request as a trigger.

    These two posts helped me a lot;

    How can I use NuGet packages in my Azure Functions?

    Copy from Azure Blob to AWS S3 using C#

    Please note my answer to the Nuget packages if you are using Azure functions 2.x.

    Here is the code - you can modify the basis of this to your needs. I return a JSON Serialized object because Azure Data Factory requires this as a response from a http request sent from a pipeline;

    #r "Microsoft.WindowsAzure.Storage"
    #r "Newtonsoft.Json"
    #r "System.Net.Http"
    
    using System.Net;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Primitives;
    using Newtonsoft.Json;
    using Microsoft.WindowsAzure.Storage.Blob;
    using System.Net.Http;
    using Amazon.S3; 
    using Amazon.S3.Model;
    using Amazon.S3.Transfer;
    using Amazon.S3.Util;
    
    
    public static async  Task<IActionResult> Run(HttpRequest req, ILogger log)
    {
        log.LogInformation("Example Function has recieved a HTTP Request");
    
        // get Params from query string
        string blobUri = req.Query["blobUri"];
        string bucketName = req.Query["bucketName"];
    
        // Validate query string
        if (String.IsNullOrEmpty(blobUri) || String.IsNullOrEmpty(bucketName)) {
    
            Result outcome = new Result("Invalid Parameters Passed to Function",false,"blobUri or bucketName is null or empty");
            return new BadRequestObjectResult(outcome.ConvertResultToJson());
        }
    
        // cast the blob to its type
        Uri blobAbsoluteUri = new Uri(blobUri);
        CloudBlockBlob blob = new CloudBlockBlob(blobAbsoluteUri);
    
        // Do the Copy
        bool resultBool = await CopyBlob(blob, bucketName, log);
    
        if (resultBool) { 
            Result outcome = new Result("Copy Completed",true,"Blob: " + blobUri + " Copied to Bucket: " + bucketName);
            return (ActionResult)new OkObjectResult(outcome.ConvertResultToJson());       
        }
        else {
            Result outcome = new Result("ERROR",false,"Copy was not successful Please review Application Logs");
            return new BadRequestObjectResult(outcome.ConvertResultToJson()); 
        }  
    }
    
    static async Task<bool> CopyBlob(CloudBlockBlob blob, string existingBucket, ILogger log) {
    
            var accessKey = "myAwsKey";
            var secretKey = "myAwsSecret";
            var keyName = blob.Name;
    
            // Make the client 
            AmazonS3Client myClient = new AmazonS3Client(accessKey, secretKey, Amazon.RegionEndpoint.EUWest1);
    
            // Check the Target Bucket Exists; 
            bool bucketExists = await AmazonS3Util.DoesS3BucketExistAsync (myClient,existingBucket);
    
            if (!bucketExists) {
                log.LogInformation("Bucket: " + existingBucket + " does not exist or is inaccessible to the application");
                return false;
            }
    
            // Set up the Transfer Utility
            TransferUtility fileTransferUtility = new TransferUtility(myClient);
    
            // Stream the file
            try {
    
                log.LogInformation("Starting Copy");
    
                using (var stream = await blob.OpenReadAsync()) {
    
                    // Note: You need permissions to not be private on the source blob
                    log.LogInformation("Streaming");
    
                    await fileTransferUtility.UploadAsync(stream,existingBucket,keyName);
    
                    log.LogInformation("Streaming Done");   
                }
    
                log.LogInformation("Copy completed");
            }
            catch (AmazonS3Exception e) {
                    log.LogInformation("Error encountered on server. Message:'{0}' when writing an object", e.Message);
                }
            catch (Exception e) {
                    log.LogInformation("Unknown encountered on server. Message:'{0}' when writing an object", e.Message);
                    return false;
            }
    
            return true; 
        }
    
    public class Result {
    
        public string result;
        public bool outcome;
        public string UTCtime;
        public string details; 
    
        public Result(string msg, bool outcomeBool, string fullMsg){
            result=msg;
            UTCtime=DateTime.Now.ToString("yyyy-MM-dd h:mm:ss tt");
            outcome=outcomeBool;
            details=fullMsg;
        }
    
        public string ConvertResultToJson() {
            return JsonConvert.SerializeObject(this);
        } 
    }