javaapache-flinkbatch-processingflink-streamingflink-batch

Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode


I have a requirement to read a file continously from a specific path.

Means flink job should continously poll the specified location and read a file that will arrive at this location at certains intervals .

Example: my location on windows machine is C:/inputfiles get a file file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.

I experimented it with below code .

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

public class ContinuousFileProcessingTest {

public static void main(String[] args) throws Exception {

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(10);
    String localFsURI = "D:\\FLink\\2021_01_01\\";
    TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
    format.setFilesFilter(FilePathFilter.createDefaultFilter());
    DataStream<String> inputStream =
            env.readFile(format, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
    SingleOutputStreamOperator<String> soso = inputStream.map(String::toUpperCase);
    soso.print();
    soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
    env.execute("read and write");
}
}

Now to test this on flink cluster i brought flink cluster up using flink's 1.9.2 version and i was able to achieve my goal of reading file continously at some intervals.

Note: Flink's 1.9.2 version can bring up cluster on windows machine.

But now i have to upgrade flink's version from 1.9.2 to 1.12 .And we used docker to bring cluster up on 1.12 (unlike 1.9.2).

Unlike windows path i changed the file location as per docker location but the same above program in not running there.

Moreover: Accessing file is not the problem.Means if i put the file before starting the job then this job reads these files correctly but if i add any new file at runtime then it does not read this newly added files.

Need help to find the solution.

Thanks in advance.


Solution

  • Try to reduce directoryScanInterval from sample code to Duration.ofSeconds(50).toMillis() and checkout StreamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC) mode.