I am attempting to create an Iceberg Table on S3 using the Glue Catalog and the PyIceberg library. My goal is to define a schema, partitioning specifications, and then create a table using PyIceberg. However, despite multiple attempts, I haven't been able to achieve this successfully and keep encountering an error related to empty path components in metadata paths.
Here's a simplified version of the code I'm using:
import boto3
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import TimestampType, DoubleType, StringType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import YearTransform, MonthTransform, DayTransform
def create_iceberg_table():
# Replace with your S3 bucket and table names
s3_bucket = "my-bucket-name"
table_name = "my-table-name"
database_name = "iceberg_catalog"
# Define the table schema
schema = Schema(
NestedField(field_id=1, name="field1", field_type=DoubleType(), required=False),
NestedField(field_id=2, name="field2", field_type=StringType(), required=False),
# ... more fields ...
)
# Define the partitioning specification with transformations
partition_spec = PartitionSpec(
PartitionField(field_id=3, source_id=3, transform=YearTransform(), name="year"),
PartitionField(field_id=3, source_id=3, transform=MonthTransform(), name="month"),
# ... more partition fields ...
)
# Create the Glue client
glue_client = boto3.client("glue")
# Specify the catalog URI where Glue should store the metadata
catalog_uri = f"s3://{s3_bucket}/catalog"
# Load the Glue catalog for the specified database
catalog = load_catalog("test", client=glue_client, uri=catalog_uri, type="GLUE")
# Create the Iceberg table in the Glue Catalog
catalog.create_table(
identifier=f"{database_name}.{table_name}",
schema=schema,
partition_spec=partition_spec,
location=f"s3://{s3_bucket}/{table_name}/"
)
print("Iceberg table created successfully!")
if __name__ == "__main__":
create_iceberg_table()
My understanding is that the PyIceberg library interacts with the Glue Catalog to manage metadata, schema, and partitions, but I seem to be missing a crucial step or misconfiguring something.
How can I properly generate an Iceberg Table on S3 using the Glue Catalog and PyIceberg?
Traceback:
Traceback (most recent call last):
File "/home/workspaceuser/app/create_iceberg_tbl.py", line 72, in <module>
create_iceberg_table()
File "/home/workspaceuser/app/create_iceberg_tbl.py", line 62, in create_iceberg_table
catalog.create_table(
File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/catalog/glue.py", line 220, in create_table
self._write_metadata(metadata, io, metadata_location)
File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/catalog/__init__.py", line 544, in _write_metadata
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))
File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/serializers.py", line 71, in table_metadata
with output_file.create(overwrite=overwrite) as output_stream:
File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/io/pyarrow.py", line 256, in create
if not overwrite and self.exists() is True:
File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/io/pyarrow.py", line 200, in exists
self._file_info() # raises FileNotFoundError if it does not exist
File "/home/workspaceuser/layers/paketo-buildpacks_cpython/cpython/lib/python3.8/site-packages/pyiceberg/io/pyarrow.py", line 182, in _file_info
file_info = self._filesystem.get_file_info(self._path)
File "pyarrow/_fs.pyx", line 571, in pyarrow._fs.FileSystem.get_file_info
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: Empty path component in path ua-weather-data/hourly_forecasts//metadata/00000-232e3e60-1c1a-4eb8-959e-6940b563acd4.metadata.json
I came across this post in LinkedIn that had an example of how to accomplish this - thanks dipankar mazumdar!!!
Removed the boto3 library, instantiated the glue catalog with the proper syntax, and created a properly formed catalog.create_table command.
Here is the adjusted working code:
from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, StringType, TimestampType, NestedField
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import YearTransform, MonthTransform, DayTransform
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
def create_iceberg_table():
# Specify the Glue Catalog database name and URI
glue_database_name = "iceberg_catalog"
glue_catalog_uri = "s3://ua-weather-data/catalog" # Replace with your Glue Catalog URI
# Instantiate glue catalog
catalog = load_catalog("glue", **{"type": "glue"})
#catalog = load_catalog(catalog_impl="org.apache.iceberg.aws.glue.GlueCatalog", name=glue_database_name, uri=glue_catalog_uri)
# Define the Iceberg schema
schema = Schema(
NestedField(field_id=1, name="cloudCover", field_type=DoubleType(), required=False),
NestedField(field_id=2, name="dayOfWeek", field_type=StringType(), required=False),
NestedField(field_id=3, name="dayOrNight", field_type=StringType(), required=False),
NestedField(field_id=4, name="expirationTimeUtc", field_type=TimestampType(), required=False),
NestedField(field_id=5, name="iconCode", field_type=DoubleType(), required=False),
NestedField(field_id=6, name="iconCodeExtend", field_type=DoubleType(), required=False),
NestedField(field_id=7, name="precipChance", field_type=DoubleType(), required=False),
NestedField(field_id=8, name="precipType", field_type=StringType(), required=False),
NestedField(field_id=9, name="pressureMeanSeaLevel", field_type=DoubleType(), required=False),
NestedField(field_id=10, name="qpf", field_type=DoubleType(), required=False),
NestedField(field_id=11, name="qpfSnow", field_type=DoubleType(), required=False),
NestedField(field_id=12, name="relativeHumidity", field_type=DoubleType(), required=False),
NestedField(field_id=13, name="temperature", field_type=DoubleType(), required=False),
NestedField(field_id=14, name="temperatureFeelsLike", field_type=DoubleType(), required=False),
NestedField(field_id=15, name="temperatureHeatIndex", field_type=DoubleType(), required=False),
NestedField(field_id=16, name="temperatureWindChill", field_type=DoubleType(), required=False),
NestedField(field_id=17, name="uvDescription", field_type=StringType(), required=False),
NestedField(field_id=18, name="uvIndex", field_type=DoubleType(), required=False),
NestedField(field_id=19, name="validTimeLocal", field_type=TimestampType(), required=True),
NestedField(field_id=20, name="validTimeUtc", field_type=DoubleType(), required=False),
NestedField(field_id=21, name="visibility", field_type=DoubleType(), required=False),
NestedField(field_id=22, name="windDirection", field_type=DoubleType(), required=False),
NestedField(field_id=23, name="windDirectionCardinal", field_type=StringType(), required=False),
NestedField(field_id=24, name="windGust", field_type=DoubleType(), required=False),
NestedField(field_id=25, name="windSpeed", field_type=DoubleType(), required=False),
NestedField(field_id=26, name="wxPhraseLong", field_type=StringType(), required=False),
NestedField(field_id=27, name="wxPhraseShort", field_type=StringType(), required=False),
NestedField(field_id=28, name="wxSeverity", field_type=DoubleType(), required=False),
NestedField(field_id=29, name="data_origin", field_type=StringType(), required=True)
)
# Define the partitioning specification with year, month, and day
partition_spec = PartitionSpec(
PartitionField(field_id=19, source_id=19, transform=YearTransform(), name="validTimeLocal_year"),
PartitionField(field_id=19, source_id=19, transform=MonthTransform(), name="validTimeLocal_month"),
PartitionField(field_id=19, source_id=19, transform=DayTransform(), name="validTimeLocal_day")
)
# Define the sorting order using validTimeUtc field
sort_order = SortOrder(SortField(source_id=20, transform=IdentityTransform()))
# Create the Iceberg table using the Iceberg catalog
table_name = "iceberg_catalog.hourly_forecasts"
catalog.create_table(
identifier=table_name,
location="s3://ua-weather-data/catalog",
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order
)
print("Iceberg table created using AWS Glue Catalog.")
if __name__ == "__main__":
create_iceberg_table()