I have a requirement to read a file continously from a specific path.
Means flink job should continously poll the specified location and read a file that will arrive at this location at certains intervals .
Example: my location on windows machine is C:/inputfiles get a file file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
I experimented it with below code .
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.List;
public class ContinuousFileProcessingTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10);
String localFsURI = "D:\\FLink\\2021_01_01\\";
TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream<String> inputStream =
env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
soso.print();
soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
env.execute("read and write");
}
}
Now to test this on flink cluster i brought flink cluster up using flink's 1.9.2 version and i was able to achieve my goal of reading file continously at some intervals.
Note: Flink's 1.9.2 version can bring up cluster on windows machine.
But now i have to upgrade flink's version from 1.9.2 to 1.12 .And we used docker to bring cluster up on 1.12 (unlike 1.9.2).
Unlike windows path i changed the file location as per docker location but the same above program in not running there.
Moreover: Accessing file is not the problem.Means if i put the file before starting the job then this job reads these files correctly but if i add any new file at runtime then it does not read this newly added files.
Need help to find the solution.
Thanks in advance.
Try to reduce directoryScanInterval from sample code to Duration.ofSeconds(50).toMillis() and checkout StreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) mode.