javaapache-flinkflink-streamingflink-sqlflink-batch

How to consume files from S3 bucket using Flink with checkpoints for failure recovery


I have a usecase to consume the files present in given S3 bucket. The problem is that I want to ensure that the Flink job process each line of the file only once in case the job is restarted.

If it was a streaming source like Kafka then checkpointing mechanism should have worked. Do we have a way to achieve checkpointing for a job consuming files from S3.

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextFileResult;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;

public class FlinkReadFileAndSendToAPI {

    public static void main(String[] args) throws Exception {

        // Set up the Flink execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Read the file from S3
        DataSource<String> text = env.readTextFile(new Path("s3://my-bucket/my-file.txt"));

        // Map the file content to a tuple containing the file name and content
        DataSet<Tuple2<String, String>> fileContent = text.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String line) throws Exception {
                return new Tuple2<String, String>("my-file.txt", line);
            }
        });

        // Send the file content to the API endpoint
        fileContent.forEach(new FileContentSender());

        // Execute the Flink job
        env.execute("Read File and Send to API");
    }

    private static class FileContentSender implements MapFunction<Tuple2<String, String>, Object> {

        @Override
        public Object map(Tuple2<String, String> fileContent) throws Exception {

            // Create the HTTP client
            CloseableHttpClient httpClient = HttpClients.createDefault();

            // Create the POST request
            HttpPost httpPost = new HttpPost("https://my-api-endpoint.com/api/file");

            // Set the request body
            HttpEntity entity = new StringEntity("{\"filename\": \"" + fileContent.f0 + "\", \"content\": \"" + fileContent.f1 + "\"}");
            httpPost.setEntity(entity);

            // Execute the POST request
            CloseableHttpResponse response = httpClient.execute(httpPost);

            // Check the response status code
            if (response.getStatusLine().getStatusCode() != 200) {
                throw new Exception("API request failed with status code: " + response.getStatusLine().getStatusCode());
            }

            // Close the response
            response.close();

            // Close the HTTP client
            httpClient.close();

            return null;
        }
    }
}


Solution

  • You should just use the FileSource that's available and documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/

    Something like:

    CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
    FileSource<SomePojo> source = 
            FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();