rustgoogle-cloud-storagegrpcrust-tonic

Facing issues with the WriteObjectRequest for Google Cloud Storage using gRPC in Rust


I’m trying to perform a simple upload to Google Cloud Storage using gRPC and the generated protos, but I’m running into an issue with the WriteObjectRequest. The error I’m getting is:

An x-goog-request-params request metadata property must be provided for this request.

The strange part is that it doesn’t even seem to check whether x-goog-request-params is present in the request metadata. However, when I use the same set of headers for a ReadObjectRequest, it correctly identifies the x-goog-request-params and processes the request successfully. I’m able to verify the content for ReadObjectRequest without any problems.

use gcloud_sdk::google::storage::v2::{storage_client::StorageClient, write_object_request, ChecksummedData, Object, ReadObjectRequest, WriteObjectRequest, WriteObjectSpec};
use tonic::{metadata::MetadataValue, transport::Channel, Request};
use futures::stream;
use hyper_rustls;
use tokio_stream::StreamExt;
use tonic::transport::ClientTlsConfig;

const BUCKET_NAME: &str = "test-bucket";
const GOOGLE_AUTH_TOKEN: &str = "token-xxxx";

struct GcsClient {
    client: StorageClient<Channel>,
    bucket: String,
}

impl GcsClient {
    async fn new(bucket: String) -> Result<Self, Box<dyn std::error::Error>> {
        let channel = Channel::from_static("https://storage.googleapis.com")
            .connect_timeout(std::time::Duration::from_secs(5))
            .timeout(std::time::Duration::from_secs(30))
            .tcp_nodelay(true)
            .http2_adaptive_window(true)
            .http2_keep_alive_interval(std::time::Duration::from_secs(30))
            .tls_config(ClientTlsConfig::new().with_native_roots())?
            .connect()
            .await?;

        Ok(Self {
            client: StorageClient::new(channel),
            bucket,
        })
    }

    fn get_formatted_bucket(&self) -> String {
        format!("projects/_/buckets/{}", self.bucket)
    }

    async fn get_token() -> Result<String, Box<dyn std::error::Error>> {
        Ok(GOOGLE_AUTH_TOKEN.to_string())
    }

    async fn auth_request<T>(&self, request: T) -> Request<T> {
        let token = Self::get_token().await.expect("Failed to get authentication token");
        let mut request = Request::new(request);

        // Authorization header
        request.metadata_mut().insert(
            "authorization",
            MetadataValue::try_from(&format!("Bearer {}", token)).unwrap(),
        );

        let formatted_bucket = self.get_formatted_bucket();
        let encoded_bucket = urlencoding::encode(&formatted_bucket);

        // Adding required x-goog-request-params based on request type
        let params = if std::any::type_name::<T>().contains("WriteObjectRequest") {
            format!("write_object_spec.resource.bucket={}", encoded_bucket)
        } else if std::any::type_name::<T>().contains("ReadObjectRequest") {
            format!("bucket={}", encoded_bucket)
        } else if std::any::type_name::<T>().contains("StartResumableWriteRequest") {
            format!("write_object_spec.resource.bucket={}", encoded_bucket)
        } else if std::any::type_name::<T>().contains("QueryWriteStatusRequest") {
            format!("upload_id={}", encoded_bucket)
        } else {
            format!("bucket={}", encoded_bucket)
        };

        request.metadata_mut().insert(
            "x-goog-request-params",
            MetadataValue::try_from(&params).unwrap(),
        );

        request
    }

    async fn simple_upload(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
        // Create write specification
        let write_spec = WriteObjectSpec {
            resource: Some(Object {
                name: object_name.to_string(),
                bucket: self.get_formatted_bucket(),
                ..Default::default()
            }),
            ..Default::default()
        };

        // Create a single request with both spec and data
        let write_request = WriteObjectRequest {
            first_message: Some(write_object_request::FirstMessage::WriteObjectSpec(write_spec)),
            write_offset: 0,
            data: Some(write_object_request::Data::ChecksummedData(ChecksummedData {
                content: data,
                crc32c: None,
            })),
            finish_write: true,
            ..Default::default()
        };

        // Create authenticated request and convert to stream
        let auth_request = self.auth_request(write_request).await;

        // Print the request metadata for debugging
        println!("Request metadata: {:#?}", auth_request.metadata());
        // Output from above:
        // Request metadata: MetadataMap {
        //     headers: {
        //         "authorization": "Bearer token-xxxx",
        //         "x-goog-request-params": "write_object_spec.resource.bucket=projects%2F_%2Fbuckets%2Ftest-bucket",
        //     },
        // }


        // Send the request
        match self.client.write_object(stream::iter(vec![auth_request.into_inner()])).await {
            Ok(response) => {
                println!("Upload successful! Response: {:?}", response);
                Ok(())
            }
            Err(e) => {
                println!("Error details: {:#?}", e);
                // Output from above:
                // Error details: Status {
                //     code: InvalidArgument,
                //     message: "An x-goog-request-params request metadata property must be provided for this request.",
                //     details: b"\x08\x03\x12UAn x-goog-request-params request metadata property must be provided for this request.\x1ah\n(type.googleapis.com/google.rpc.ErrorInfo\x12<\n\"GRPC_INVALID_X_GOOG_REQUEST_PARAMS\x12\x16storage.googleapis.com",
                //     metadata: MetadataMap {
                //         headers: {
                //             "grpc-server-stats-bin": "AAAzYToAAAAAAA",
                //             "google.rpc.errorinfo-bin": "CiJHUlBDX0lOVkFMSURfWF9HT09HX1JFUVVFU1RfUEFSQU1TEhZzdG9yYWdlLmdvb2dsZWFwaXMuY29t",
                //             "endpoint-load-metrics-bin": "MbFRfe/oHjJASbtUC3gdlq0/",
                //             "content-type": "application/grpc",
                //             "grpc-accept-encoding": "identity, deflate, gzip",
                //             "content-length": "0",
                //             "date": "Tue, 07 Jan 2025 21:58:42 GMT",
                //             "alt-svc": "h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
                //         },
                //     },
                //     source: None,
                // }

                Err(Box::new(e))
            }
        }
    }

    async fn download_object(&mut self, object_name: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
        let request = ReadObjectRequest {
            bucket: self.get_formatted_bucket(),
            object: object_name.to_string(),
            ..Default::default()
        };


        let request = self.auth_request(request).await;
        println!("Request metadata: {:#?}", request.metadata());
        // Output from above:
        // Request metadata: MetadataMap {
        //     headers: {
        //         "authorization": "Bearer token-xxxx",
        //         "x-goog-request-params": "bucket=projects%2F_%2Fbuckets%2Ftest-bucket",
        //     },
        // }

        let response = self.client.read_object(request).await?;
        let mut stream = response.into_inner();
        let mut content = Vec::new();

        while let Some(chunk) = StreamExt::next(&mut stream).await {
            let chunk = chunk?;
            if let Some(data) = chunk.checksummed_data {
                content.extend(data.content);
            }
        }

        Ok(content)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let bucket = BUCKET_NAME.to_string();
    let mut client = GcsClient::new(bucket).await?;

    // Test small file upload
    println!("\n=== Testing Small File Upload ===");
    let small_content = b"Hello, GCS! This is a test.".to_vec();
    let small_object_name = "test-small-file.txt";

    println!("Uploading small file: {}", small_object_name);
    client.simple_upload(small_object_name, small_content.clone()).await?;
    println!("Small file upload complete!");
    // The above output is failing with the error specified in the function.

    // Download and verify small file
    println!("\n=== Downloading Small File ===");
    let downloaded = client.download_object(small_object_name).await?;
    println!("Downloaded content: {}", String::from_utf8_lossy(&downloaded));
    println!("Content verification: {}", downloaded == small_content);
    // Output from above:
    // Downloaded content: Hello, GCS! This is a test.
    // Content verification: true

    Ok(())
}

What can I try next?


Solution

  • It seems that the error you're encountering is expected behavior. The WriteObject method is a stream of requests but is not a stream of replies: there's only a single reply at the end.

    Also stated here

    When a ClientStream is opened, there is no guarantee of a Request message which the BucketName can be sourced from. For a unary, or server stream a Request Message must be provided before the request is initiated.

    Google Cloud Storage requires x-goog-request-params on every stream message (for WriteObject): Unlike ReadObject, the Google Cloud Storage gRPC seems to strictly require the x-goog-request-params metadata header for every single WriteObjectRequest in the stream.