apache-flinkflink-table-api

Read a csv file in Flink 1.15 using Table API


I'm doing a simple tutorial with flink + java using Table API. What I want to do is really simple - I want to read a csv file from a local filesystem, using a schema and print it out.

The way I'm doing this is this(the code below is compiled from samples from Flink's website tutorial section):

package p1;

import org.apache.flink.table.api.*;
import org.apache.flink.api.java.utils.ParameterTool;


public class CabAggregation {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inBatchMode()
                .build();

        TableEnvironment tableEnv = TableEnvironment.create(settings);
        final Schema schema = Schema.newBuilder()
                .column("cab_id", DataTypes.INT())
                .column("cab_plate", DataTypes.STRING())
                .column("cab_make", DataTypes.STRING())
                .column("cab_driver", DataTypes.STRING())
                .column("active_trip", DataTypes.STRING())
                .column("pickup_location", DataTypes.STRING())
                .column("target_location", DataTypes.STRING())
                .column("num_pass", DataTypes.INT())
                .build();


        tableEnv.createTemporaryTable("cabs",
                TableDescriptor
                        .forConnector("filesystem")
                        .schema(schema)
                        .option("path", "file:///Users/virtual/Downloads/cabs.csv")
                        .format(FormatDescriptor.forFormat("csv").build())
                        .build());

        Table result = tableEnv.from("cabs").select("*");
        result.execute().print();
    }
}

Running this gives me this:

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
print

Now, it seems evident that somehow CSV is not available as a factory identifier. I can't figure out why. I'm building the project with maven.


Solution

  • You'll be needing these dependencies. Have you added them?

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-files</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>