I'm trying to set up Flink SQL with the Apache Iceberg catalog and Hive Metastore, but having no luck. Below are the steps I've taken on a clean Flink 1.18.1 installation, and the resulting error that I get.
Run Hive MetaStore:
docker run --rm --detach --name hms-standalone \
--publish 9083:9083 \
ghcr.io/recap-build/hive-metastore-standalone:latest
Run MinIO using Docker:
docker run --rm --detach --name minio \
-p 9001:9001 -p 9000:9000 \
-e "MINIO_ROOT_USER=admin" \
-e "MINIO_ROOT_PASSWORD=password" \
minio/minio server /data --console-address ":9001"
Provision a bucket:
docker exec minio \
mc config host add minio http://localhost:9000 admin password
docker exec minio \
mc mb minio/warehouse
Add the required MinIO configuration to ./conf/flink-conf.yaml
:
cat >> ./conf/flink-conf.yaml <<EOF
fs.s3a.access.key: admin
fs.s3a.secret.key: password
fs.s3a.endpoint: http://localhost:9000
fs.s3a.path.style.access: true
EOF
Flink's S3 plugin:
mkdir ./plugins/s3-fs-hadoop
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/
Flink's Hive connector:
mkdir -p ./lib/hive
curl -s https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.18.1/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar -o ./lib/hive/flink-sql-connector-hive-3.1.3_2.12-1.18.1.jar
Dependencies for Iceberg:
mkdir ./lib/iceberg
curl https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.4.3/iceberg-flink-runtime-1.17-1.4.3.jar -o ./lib/iceberg/iceberg-flink-runtime-1.17-1.4.3.jar
mkdir -p ./lib/aws
curl https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar -o ./lib/aws/hadoop-aws-3.3.6.jar
Set the Hadoop dependency:
export HADOOP_CLASSPATH=$(~/hadoop/hadoop-3.3.2/bin/hadoop classpath)
Launch SQL Client:
./bin/sql-client.sh
Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
> 'type' = 'iceberg',
> 'client.assume-role.region' = 'us-east-1',
> 'warehouse' = 's3a://warehouse',
> 's3.endpoint' = 'http://localhost:9000',
> 's3.path-style-access' = 'true',
> 'catalog-type'='hive',
> 'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.
Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)
Flink SQL>
Full stacktrace
Caused by: org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to execute the operation b685c995-3280-4a9e-b6c0-18ab9369d790. │
│ at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:414) │
│ at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:267) │
│ at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) │
│ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) │
│ at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) │
│ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) │
│ at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) │
│ at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) │
│ ... 1 more │
│Caused by: org.apache.flink.table.api.TableException: Could not execute CREATE DATABASE: (catalogDatabase: [{}], catalogName: [c_iceberg_hive], databaseName: [db_rmoff], ignoreIfExists: [│
│ at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:90) │
│ at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1092) │
│ at org.apache.flink.table.gateway.service.operation.OperationExecutor.callOperation(OperationExecutor.java:556) │
│ at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:444) │
│ at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:207) │
│ at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) │
│ at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) │
│ at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) │
│ ... 7 more │
│Caused by: java.lang.RuntimeException: Failed to create namespace db_rmoff in Hive Metastore │
│ at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294) │
│ at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:222) │
│ at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:213) │
│ at org.apache.flink.table.catalog.CatalogManager.createDatabase(CatalogManager.java:1381) │
│ at org.apache.flink.table.operations.ddl.CreateDatabaseOperation.execute(CreateDatabaseOperation.java:84) │
│ ... 14 more │
│Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245) │
│ at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093) │
│ at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811) │
│ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) │
│ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) │
│ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) │
│ at java.base/java.lang.reflect.Method.invoke(Method.java:566) │
│ at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) │
│ at com.sun.proxy.$Proxy35.createDatabase(Unknown Source) │
│ at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283) │
│ at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58) │
│ at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) │
│ at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122) │
│ at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281) │
│ ... 18 more
Verify that hadoop-aws
is on the Classpath:
❯ ps -ef|grep sql-client|grep hadoop-aws
501 51499 45632 0 7:38pm ttys007 0:06.81 /Users/rmoff/.sdkman/candidates/java/current/bin/java -XX:+IgnoreUnrecognizedVMOptions --add-exports=java.base/sun.net.util=ALL-UNNAMED --ad
d-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=
jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exp
orts=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-
opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens
=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNN
AMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED -Dlog.file=/Users/rmoff/flink/flink-1.18.1/log/flink-rmoff-sql-client-asgard08.log -Dlog4j.configuration=file:/Users/rmoff/
flink/flink-1.18.1/conf/log4j-cli.properties -Dlog4j.configurationFile=file:/Users/rmoff/flink/flink-1.18.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/Users/rmoff/flink/fli
nk-1.18.1/conf/logback.xml -classpath /Users/rmoff/flink/flink-1.18.1/lib/aws/hadoop-aws-3.3.6.jar:/Users/rmoff/flink/flink-1.18.1/lib/flink-cep-1.18.1.jar:/Users/rmoff/flink/flink-1.18.1/l[…]
Confirm that the JAR holds the S3AFileSystem class:
❯ jar tvf lib/aws/hadoop-aws-3.3.6.jar|grep -i filesystem.class
157923 Sun Jun 18 08:56:00 BST 2023 org/apache/hadoop/fs/s3a/S3AFileSystem.class
3821 Sun Jun 18 08:56:02 BST 2023 org/apache/hadoop/fs/s3native/NativeS3FileSystem.class
I get the same error if I strip the CREATE CATALOG
back to bare-bones too:
Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
> 'type' = 'iceberg',
> 'warehouse' = 's3a://warehouse',
> 'catalog-type'='hive',
> 'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG c_iceberg_hive2;
[INFO] Execute statement succeed.
Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)
Java version:
❯ java --version
openjdk 11.0.21 2023-10-17
OpenJDK Runtime Environment Temurin-11.0.21+9 (build 11.0.21+9)
OpenJDK 64-Bit Server VM Temurin-11.0.21+9 (build 11.0.21+9, mixed mode)
Edit 01
Other things that I've tried:
./lib
instead of subfolderss3-fs-hadoop
pluginiceberg-aws-bundle-1.4.3.jar
and aws-java-sdk-bundle-1.12.648.jar
(separately, and together)More diagnostics:
If I add the three SQL statements (CREATE CATALOG
/ USE CATALOG
/ CREATE DATABASE
) to a file and launch SQL Client with verbose class logging:
JVM_ARGS=-verbose:class ./bin/sql-client.sh -f ../iceberg.sql > iceberg.log
I get this output, showing that the hadoop-aws
JAR just isn't picked up, even though it's in the classpath.
If I add Flink's s3-fs-hadoop
back in we can see it being picked up (log), but still get the same failure.
Edit 02
If I switch from s3a
to s3
I get a different error ¯\_(ツ)_/¯
Flink SQL> CREATE CATALOG c_iceberg_hive WITH (
> 'type' = 'iceberg',
> 'client.assume-role.region' = 'us-east-1',
> 'warehouse' = 's3://warehouse',
> 's3.endpoint' = 'http://localhost:9000',
> 's3.path-style-access' = 'true',
> 'catalog-type'='hive',
> 'uri'='thrift://localhost:9083');
[INFO] Execute statement succeed.
Flink SQL> USE CATALOG c_iceberg_hive;
[INFO] Execute statement succeed.
Flink SQL> CREATE DATABASE db_rmoff;
[ERROR] Could not execute SQL statement. Reason:
MetaException(message:Got exception: org.apache.hadoop.fs.UnsupportedFileSystemException No FileSystem for scheme "s3")
If I add in io-impl
I get yet another different error, which again seems (to my limited understanding) to suggest that hadoop-aws
JAR isn't being picked up
Flink SQL> CREATE CATALOG c_iceberg_hive2 WITH (
> 'type' = 'iceberg',
> 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
> 'client.assume-role.region' = 'us-east-1',
> 'warehouse' = 's3://warehouse',
> 's3.endpoint' = 'http://localhost:9000',
> 's3.path-style-access' = 'true',
> 'catalog-type'='hive',
> 'uri'='thrift://localhost:9083');
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: software.amazon.awssdk.services.s3.model.S3Exception
The error you observe originates from Hive Metastore server, not from Flink:
│Caused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245) │
│ at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106) │
│ at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093) │
│ at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811) │
│ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) │
│ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) │
│ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) │
│ at java.base/java.lang.reflect.Method.invoke(Method.java:566) │
│ at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) │
│ at com.sun.proxy.$Proxy35.createDatabase(Unknown Source) │
│ at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283) │
│ at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
This indicates that this error had been received from Hive Thrift API.
Docker image used here to run Hive does not include hadoop-aws
- you need to add it yourself, or use another Hive image that contains required dependencies.