google-cloud-platformgoogle-cloud-storagepartitioninggoogle-cloud-pubsub

Google Pubsub Cloud Storage subscription to combine messages into same avro file


The problem is that no matter how I set this Cloud Storage Subscription I keep seeing 1 Avro file for each message published to the Pubsub topic. I would like to change this somehow, to combine messages into 1 Avro file, or perhaps keep appending to the same Avro file based on the time/space constraints.

# cloud storage subscription
gcloud pubsub subscriptions create projects/my-project/subscriptions/my-subscription \
  --topic=projects/my-project/topics/my-topic \
  --cloud-storage-bucket=my-bucket \
  --cloud-storage-file-prefix=my-prefix/ \
  --cloud-storage-file-suffix=_my-suffix.avro \
  --cloud-storage-max-bytes=2GB \
  --cloud-storage-max-duration=1m \
  --cloud-storage-output-format=avro \
  --cloud-storage-write-metadata \
  --dead-letter-topic=projects/my-project/topics/my-dlt \
  --max-delivery-attempts=5 \
  --project=my-project

Then I send 2 messages to the Pubsub topic.

Then I wait 1 minute (see --cloud-storage-max-duration=1m above).

Then I check the content of the GCS bucket:

$ gsutil ls gs://my-bucket/my-prefix/
gs://my-bucket/my-prefix/2024-06-14T14:22:45+00:00_6db102_my-suffix.avro
gs://my-bucket/my-prefix/2024-06-14T14:22:46+00:00_0bc008_my-suffix.avro

But here I am trying to combine Pubsub messages into 1 avro file. I was expecting that --cloud-storage-max-bytes=2GB and --cloud-storage-max-duration=1m were doing this, but they are not.

I also tried to remove the date-time sensitivity of the filenames with --cloud-storage-file-datetime-format=YYYY-MM-DD and --cloud-storage-file-datetime-format=YYYY-MM-DD_hh but when I try to create the Cloud Storage Subscription it fails because it requires a full datetime definition format.

See here for the docs describing the above: https://cloud.google.com/pubsub/docs/create-cloudstorage-subscription#file_names

I would like to have a GCS bucket structure like:

gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_1>_my-suffix.avro
gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_2>_my-suffix.avro
...
gs://my-bucket/my-prefix/YYYY-MM-DD_<uuid_N>_my-suffix.avro

Or perhaps having ./YYYY-DD-MM/ as a sub-folder.

Ideally I would like to avoid Dataflow or Dataproc (Spark). Looks like a Cloud Storage Subscription could do a good enough work to flush data into GCS.

Is this possible at all? How can I do this?


Solution

  • Unfortunately, due to the way Pub/Sub scales work across backends, it is expected behavior that your messages are being spread across multiple files. This is discussed in the troubleshooting documentation.

    your Cloud Storage subscription might be handled by multiple Pub/Sub backends. Each backend writes to a separate Cloud Storage file, so you might observe your Cloud Storage creating more files than expected, especially for low throughput workloads.

    If you need your Cloud Storage subscription to place messages in fewer concurrent files, you can file a feature request describing your use case and it will be considered.

    In the meantime, if you need your messages to go into the same file, you may need to set up a separate process to consolidate files after they are written. One way to do this is by running a periodic process that composes Cloud Storage objects.

    gcloud storage objects compose \
      gs://BUCKET_NAME/SOURCE_OBJECT_1 \
      gs://BUCKET_NAME/SOURCE_OBJECT_2 \
      gs://BUCKET_NAME/COMPOSITE_OBJECT_NAME