apache-flink

Set a Job Name to Flink job using Table API


I want to set up a Job Name for my Flink application written using Table API, like I did it using Streaming API env.execute(jobName).

I want to replace:

enter image description here

I can't find a way in documentation except to do it while running a job from jar

session mode:

bin/flink run -d -yD pipeline.name=MyPipelineName-v1.0 ...

application mode:

bin/flink run-application -d \
    -t yarn-application \
    -Dpipeline.name="JOB_NAME" ...

Update:

In case someone will face the same situation. We can add Table API pipelines to Data Stream API Doc, so doing like that will allow us to have a desired job name set programmatically.

Ex.:

    val sinkDescriptor = TableDescriptor.forConnector("kafka")
        .option("topic","topic_out")
        .option("properties.bootstrap.servers", "localhost:9092")
        .schema(schema)
        .format(FormatDescriptor.forFormat("avro").build())
        .build()

    tEnv.createTemporaryTable("OutputTable", sinkDescriptor)

    statementSet.addInsert(sinkDescriptor, tA)
    statementSet.attachAsDataStream()

    env.execute(jobName)

Solution

  • Actually we can do it like this:

    val tEnv = StreamTableEnvironment.create(env)
    val tableConf = tEnv.getConfig.getConfiguration
    tableConf.setString("pipeline.name", "MyJobName")
    

    or using SET statement in SQL Cli:

    -- setting the job name
    SET 'pipeline.name' = 'MyJobName';