pythongoogle-bigquerygoogle-bigquery-storage-api

How to use nested proto.Message with BigQuery Storage API Writer python client?


Based on the snippet from https://github.com/googleapis/python-bigquery-storage/issues/398 which uses proto-plus package to define protobuff message in python, is very helpful and works well as it is, but in case of the nested message it does not work.
The below adapted code throws the error: google.api_core.exceptions.InvalidArgument: 400 Invalid proto schema: BqMessage.proto: Message.nested: "._default_package.Team" is not defined. when calling await bq_write_client.append_rows(iter([append_row_request])) if the message is nested.

P.S. I know that the google-cloud-bigquery-storag library works with the nested messages in general because using the official snippet https://github.com/googleapis/python-bigquery-storage/blob/main/samples/snippets/append_rows_proto2.py works and it uses the nested message but in a separate .proto file which needs a compilation step and is not as practical as defining message directly in python.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import asyncio

import proto
from google.oauth2.service_account import Credentials
from google.protobuf.descriptor_pb2 import DescriptorProto
from google.cloud.bigquery_storage_v1beta2.types.storage import AppendRowsRequest
from google.cloud.bigquery_storage_v1beta2.types.protobuf import ProtoSchema, ProtoRows
from google.cloud.bigquery_storage_v1beta2.services.big_query_write import BigQueryWriteAsyncClient

class Team(proto.Message):
    name = proto.Field(proto.STRING, number=1)

class UserSchema(proto.Message):
    username = proto.Field(proto.STRING, number=1)
    email = proto.Field(proto.STRING, number=2)
    team = proto.Field(Team, number=3)

async def main():
    write_stream_path = BigQueryWriteAsyncClient.write_stream_path(
        "yolocommon", "test", "t_test_data", "_default")

    credentials = Credentials.from_service_account_file(filename="bigquery_config_file.json")
    bq_write_client = BigQueryWriteAsyncClient(credentials=credentials)

    proto_descriptor = DescriptorProto()
    UserSchema.pb().DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema = ProtoSchema(proto_descriptor=proto_descriptor)

    serialized_rows = []
    data = [
        {
            "username": "Jack",
            "email": "jack@google.com",
            "nested": {
                "name": "Jack Jack"
            }
        },
        {
            "username": "mary",
            "email": "mary@google.com",
            "nested": {
                "name": "Mary Mary"
            }
        }
    ]
    for item in data:
        instance = UserSchema.from_json(payload=json.dumps(item))
        serialized_rows.append(UserSchema.serialize(instance))

    proto_data = AppendRowsRequest.ProtoData(
        rows=ProtoRows(serialized_rows=serialized_rows),
        writer_schema=proto_schema
    )

    append_row_request = AppendRowsRequest(
        write_stream=write_stream_path,
        proto_rows=proto_data
    )

    result = await bq_write_client.append_rows(iter([append_row_request]))
    async for item in result:
        print(item)


if __name__ == "__main__":
    asyncio.run(main())

UPDATE: From ProtoSchema's documentation:

Descriptor for input message. The provided descriptor must be self contained, such that data rows sent can be fully decoded using only the single descriptor. For data rows that are compositions of multiple independent messages, this means the descriptor may need to be transformed to only use nested types: https://developers.google.com/protocol-buffers/docs/proto#nested So the right way to write message's description is:

class UserSchema(proto.Message):
    class Team(proto.Message):
        name = proto.Field(proto.STRING, number=1)

    username = proto.Field(proto.STRING, number=1)
    email = proto.Field(proto.STRING, number=2)
    team = proto.Field(Team, number=3)

But it still throws the same error: google.api_core.exceptions.InvalidArgument: 400 Invalid proto schema: BqMessage.proto: Message.nested: "._default_package.UserSchema.Team" is not defined.

UPDATE2: The base of the issue is that proto-plus appends _default_package as a package name if the package name is empty because that causes another error. https://github.com/googleapis/proto-plus-python/blob/main/proto/_package_info.py#L40

TODO: Revert to empty string as a package value after protobuf fix. When package is empty, upb based protobuf fails with an "TypeError: Couldn't build proto file into descriptor pool: invalid name: empty part ()' means" during an attempt to add to descriptor pool.

Apparently, at the moment it is not possible to use the proto.Message to represent the BigQuery table if it has a nested field (STRUCT).


Solution

  • protobuf fixed so fork the project and change the line: https://github.com/googleapis/proto-plus-python/blob/main/proto/_package_info.py#L40

    to

        package = getattr(
            proto_module, "package", module_name if module_name else ""
        )
    

    And it will work