I want to use the library pyiceberg with Google cloud storage.
I have a catalog created in Google Cloud storage using Pyspark and I would want to read this tables from there.
I see this documentation to create a catalog object for GSC, but I really don't understand how to connect to it or how to create a config object for google cloud.
I tried:
catalog = load_catalog(
uri="gs://catalog",
type="gcsfs"
)
but I get an error:
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
Cell In[4], line 1
----> 1 catalog = load_catalog(
2 name="gcsfs",
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/site-packages/pyiceberg/catalog/__init__.py:212, in load_catalog(name, **properties)
210 catalog_type = None
211 if provided_catalog_type and isinstance(provided_catalog_type, str):
--> 212 catalog_type = CatalogType[provided_catalog_type.upper()]
213 elif not provided_catalog_type:
214 catalog_type = infer_catalog_type(name, conf)
File ~/opt/anaconda3/envs/pyceberg/lib/python3.11/enum.py:792, in EnumType.__getitem__(cls, name)
788 def __getitem__(cls, name):
789 """
790 Return the member matching `name`.
791 """
--> 792 return cls._member_map_[name]
KeyError: 'GCSFS'
I installed the package pypiceberg[gcsfs].
I see in the PYICEBERG github repository
AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
CatalogType.REST: load_rest,
CatalogType.HIVE: load_hive,
CatalogType.GLUE: load_glue,
CatalogType.DYNAMODB: load_dynamodb,
CatalogType.SQL: load_sql,
}
Pyiceberg is a python library for working with Iceberg tables.
First, get the OAuth2 token using the service account file. I'm running this in a collab so I needed to get it done this way. You could do this differently if running in a container.
import google.auth
from google.auth.transport.requests import Request
from pyiceberg import catalog
def get_access_token(service_account_file, scopes):
"""
Retrieves an access token from Google Cloud Platform using service account credentials.
Args:
service_account_file: Path to the service account JSON key file.
scopes: List of OAuth scopes required for your application.
Returns:
The access token as a string.
"""
credentials, name = google.auth.load_credentials_from_file(
service_account_file, scopes=scopes)
request = Request()
credentials.refresh(request) # Forces token refresh if needed
return credentials
# Example usage
service_account_file = "/path-to-service-account-file.json" # Replace with your path
scopes = ["https://www.googleapis.com/auth/cloud-platform"] # Adjust scopes as needed
access_token = get_access_token(service_account_file, scopes)
Next, the catalog is loaded. We use the OAUTH2 credentials retrieved using our service account key.
I have redacted datetime_to_unix_ms
function to focus on the main task.
Since you are just beginning, I suggest keeping your implementation light by using a database for your registry.
If you already have an EMR cluster up, you should look into using Hive metastore instead.
For this example, we will use an sqlite database for our central registry. You can replace this with any of the SQL database options supported by the SQLalchemy library.
REGISTRY_DATABASE_URI = "sqlite:///catalog.db" # replace this with your database URI
catalog_inst = catalog.load_catalog(
"default",
**{
"uri": REGISTRY_DATABASE_URI,
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"gcs.oauth2.token-expires-at": datetime_to_unix_ms(access_token.expiry),
"gcs.project-id": "project-id", # replace with your gcp project id
"gcs.oauth2.token": access_token.token,
"gcs.default-bucket-location": "gs://bucket/", # replace with your gcs bucket
"warehouse": "gs://bucket/" # replace with your gcs bucket
}
)
Finally, we create an example table with some data using Pyarrow:
import pyarrow as pa
catalog_inst.create_namespace("default") # Replace this with your namespace
# Define the schema for the book table
schema = pa.schema([
('title', pa.string())
])
catalog_inst.drop_table("default.books") # Replace this with your table
table = catalog_inst.create_table("default.books", schema=schema)
# Create some sample data
titles = ["The Lord of the Rings", "Pride and Prejudice", "Moby Dick"]
# Create Arrow arrays from the data
title_array = pa.array(titles, type=pa.string())
table_data = pa.Table.from_arrays([title_array], names=schema.names)
table.append(table_data)