I am using the WordCountProg from the tutorial on https://www.tutorialspoint.com/apache_flink/apache_flink_creating_application.htm . The code is as follows:
WordCountProg.java
package main.java.spendreport;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCountProg {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet<String> text = env.readTextFile(params.get("input"));
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
This example takes in a text file as input, provides a count for how many times a word appears on the document, and writes the results to an output file.
I am creating my Job Image using the following Dockerfile:
Dockerfile
FROM flink:1.13.0-scala_2.11
WORKDIR /opt/flink/usrlib
# Create Directory for Input/Output
RUN mkdir /opt/flink/resources
COPY target/wordcount-0.0.1-SNAPSHOT.jar /opt/flink/usrlib/wordcount.jar
Then the yaml for my job looks as follows:
apiVersion: batch/v1
kind: Job
metadata:
name: flink-jobmanager
spec:
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: docker/wordcount:latest
imagePullPolicy: Never
env:
#command: ["ls"]
args: ["standalone-job", "--job-classname", "main.java.spendreport.WordCountProg", "-input", "/opt/flink/resources/READ.txt", "-output", "/opt/flink/resources/results.txt"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
#args: ["standalone-job", "--job-classname", "org.sense.flink.examples.stream.tpch.TPCHQuery03"] #, <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: job-artifacts-volume
mountPath: /opt/flink/resources
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
# directory location on host
path: /Users/my-user/Documents/semafor/apache_flink/PV
The goal is to mount /Users/my-user/Documents/semafor/apache_flink/PV where there is a READ.txt file into the pod that serves as input to the job. But when the job tries to execute, I get the following error:
java.io.FileNotFoundException: File /opt/flink/resources/READ.txt does not exist or the user running Flink ('flink') has insufficient permissions to access it.
I have tried to run:
sudo chown -R 9999:9999 /Users/my-user/Documents/semafor/apache_flink/PV
Also ran chmod 777... but I get the same error.
I also tried copying the jar to where the READ.txt file is: /Users/my-user/Documents/semafor/apache_flink/PV on my local directory and mount that to /opt/flink/usrlib instead, but then I got:
org.apache.flink.util.FlinkException: Could not find the provided job class (main.java.spendreport.WordCountProg) in the user lib directory (/opt/flink/usrlib).
I am not that experienced in Kubernetes or Flink, so I'm not sure if I am mounting incorrectly or if I'm doing something wrong. If you have any suggestions, please lmk. Thanks in advance.
If using minikube you need to first mount the volume using
minikube mount /Users/my-user/Documents/semafor/apache_flink/PV:/tmp/PV
Then use /tmp/PV in your hostPath configuration in the volumes section
Refer to these threads: Minikube volume write permissions?