I have a usecase to consume the files present in given S3 bucket. The problem is that I want to ensure that the Flink job process each line of the file only once in case the job is restarted.
If it was a streaming source like Kafka then checkpointing mechanism should have worked. Do we have a way to achieve checkpointing for a job consuming files from S3.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextFileResult;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
public class FlinkReadFileAndSendToAPI {
public static void main(String[] args) throws Exception {
// Set up the Flink execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read the file from S3
DataSource<String> text = env.readTextFile(new Path("s3://my-bucket/my-file.txt"));
// Map the file content to a tuple containing the file name and content
DataSet<Tuple2<String, String>> fileContent = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String line) throws Exception {
return new Tuple2<String, String>("my-file.txt", line);
}
});
// Send the file content to the API endpoint
fileContent.forEach(new FileContentSender());
// Execute the Flink job
env.execute("Read File and Send to API");
}
private static class FileContentSender implements MapFunction<Tuple2<String, String>, Object> {
@Override
public Object map(Tuple2<String, String> fileContent) throws Exception {
// Create the HTTP client
CloseableHttpClient httpClient = HttpClients.createDefault();
// Create the POST request
HttpPost httpPost = new HttpPost("https://my-api-endpoint.com/api/file");
// Set the request body
HttpEntity entity = new StringEntity("{\"filename\": \"" + fileContent.f0 + "\", \"content\": \"" + fileContent.f1 + "\"}");
httpPost.setEntity(entity);
// Execute the POST request
CloseableHttpResponse response = httpClient.execute(httpPost);
// Check the response status code
if (response.getStatusLine().getStatusCode() != 200) {
throw new Exception("API request failed with status code: " + response.getStatusLine().getStatusCode());
}
// Close the response
response.close();
// Close the HTTP client
httpClient.close();
return null;
}
}
}
You should just use the FileSource that's available and documented at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
Something like:
CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source =
FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();