I want to set up a Job Name for my Flink application written using Table API, like I did it using Streaming API env.execute(jobName)
.
I want to replace:
I can't find a way in documentation except to do it while running a job from jar
session mode:
bin/flink run -d -yD pipeline.name=MyPipelineName-v1.0 ...
application mode:
bin/flink run-application -d \
-t yarn-application \
-Dpipeline.name="JOB_NAME" ...
Update:
In case someone will face the same situation. We can add Table API pipelines to Data Stream API Doc, so doing like that will allow us to have a desired job name set programmatically.
Ex.:
val sinkDescriptor = TableDescriptor.forConnector("kafka")
.option("topic","topic_out")
.option("properties.bootstrap.servers", "localhost:9092")
.schema(schema)
.format(FormatDescriptor.forFormat("avro").build())
.build()
tEnv.createTemporaryTable("OutputTable", sinkDescriptor)
statementSet.addInsert(sinkDescriptor, tA)
statementSet.attachAsDataStream()
env.execute(jobName)
Actually we can do it like this:
val tEnv = StreamTableEnvironment.create(env)
val tableConf = tEnv.getConfig.getConfiguration
tableConf.setString("pipeline.name", "MyJobName")
or using SET
statement in SQL Cli:
-- setting the job name
SET 'pipeline.name' = 'MyJobName';