
How to process a single GCS-stored file on Document AI with the Python client?

I have been testing out the Google Document AI Python client, but I couldn't get the process_document() function working when trying to process one single document stored on Google Cloud Storage.

What I currently have:

Other options I've tried:

Which brings me to the question, how do you process a single GCS-stored file with the Python client?

I have tried modifying the quick start example by replacing the class RawDocument with other classes that takes in GCS URI as input, but it didn't work:

Here is a code snippet that will raise this error:

from google.api_core.client_options import ClientOptions
from import documentai

opts = ClientOptions(api_endpoint=f"")
client = documentai.DocumentProcessorServiceClient(client_options=opts)
name = client.processor_path("my_project_id", "us", "my_processor_id")

gcs_document = documentai.Document(
    uri = "gs://my_image.jpeg",
    mime_type = "image/jpeg"
request = documentai.ProcessRequest(
    name = name,
    inline_document = gcs_document

# 400 Only content payload is supported for Sync Process.
result = client.process_document(request=request)


  • You cannot use Online Processing process_document() for files in Google Cloud Storage without downloading them locally.

    Update: Online Processing functionality has been updated to allow processing of a single file in Google Cloud storage. You can add a GCS document link using the gcs_document parameter in the processing request

    You can use Batch Processing batch_process_documents() to process a single document from Google Cloud Storage. It functions exactly the same as processing multiple documents with batch processing, but you just input a single document instead of a prefix or multiple documents. The input and output are both in Google Cloud Storage, and the processing is asynchronous.

    The Python code sample in Send a processing request > Batch processing shows exactly what you are looking for. Look for #######

    import re
    from typing import Optional
    from google.api_core.client_options import ClientOptions
    from google.api_core.exceptions import InternalServerError
    from google.api_core.exceptions import RetryError
    from import documentai  # type: ignore
    from import storage
    # TODO(developer): Uncomment these variables before running the sample.
    # project_id = "YOUR_PROJECT_ID"
    # location = "YOUR_PROCESSOR_LOCATION" # Format is "us" or "eu"
    # processor_id = "YOUR_PROCESSOR_ID" # Create processor before running sample
    # gcs_output_uri = "YOUR_OUTPUT_URI" # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
    # processor_version_id = "YOUR_PROCESSOR_VERSION_ID" # Optional. Example: pretrained-ocr-v1.0-2020-09-23
    # TODO(developer): You must specify either `gcs_input_uri` and `mime_type` or `gcs_input_prefix`
    # gcs_input_uri = "YOUR_INPUT_URI" # Format: gs://bucket/directory/file.pdf
    # input_mime_type = "application/pdf"
    # gcs_input_prefix = "YOUR_INPUT_URI_PREFIX" # Format: gs://bucket/directory/
    # field_mask = "text,entities,pages.pageNumber"  # Optional. The fields to return in the Document object.
    def batch_process_documents(
        project_id: str,
        location: str,
        processor_id: str,
        gcs_output_uri: str,
        processor_version_id: Optional[str] = None,
        gcs_input_uri: Optional[str] = None,
        input_mime_type: Optional[str] = None,
        gcs_input_prefix: Optional[str] = None,
        field_mask: Optional[str] = None,
        timeout: int = 400,
    ) -> None:
        # You must set the `api_endpoint` if you use a location other than "us".
        opts = ClientOptions(api_endpoint=f"{location}")
        client = documentai.DocumentProcessorServiceClient(client_options=opts)
        if gcs_input_uri:
            # Specify specific GCS URIs to process individual documents
            gcs_document = documentai.GcsDocument(
                gcs_uri=gcs_input_uri, mime_type=input_mime_type
            # This line is where the single document is being added
            gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
            input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
            # Specify a GCS URI Prefix to process an entire directory
            gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_prefix)
            input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
        # Cloud Storage URI for the Output Directory
        gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
            gcs_uri=gcs_output_uri, field_mask=field_mask
        # Where to write results
        output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)
        if processor_version_id:
            # The full resource name of the processor version, e.g.:
            # projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
            name = client.processor_version_path(
                project_id, location, processor_id, processor_version_id
            # The full resource name of the processor, e.g.:
            # projects/{project_id}/locations/{location}/processors/{processor_id}
            name = client.processor_path(project_id, location, processor_id)
        request = documentai.BatchProcessRequest(
        # BatchProcess returns a Long Running Operation (LRO)
        operation = client.batch_process_documents(request)
        # Continually polls the operation until it is complete.
        # This could take some time for larger files
        # Format: projects/{project_id}/locations/{location}/operations/{operation_id}
            print(f"Waiting for operation {} to complete...")
        # Catch exception when operation doesn't finish before timeout
        except (RetryError, InternalServerError) as e:
        # NOTE: Can also use callbacks for asynchronous processing
        # def my_callback(future):
        #   result = future.result()
        # operation.add_done_callback(my_callback)
        # Once the operation is complete,
        # get output document information from operation metadata
        metadata = documentai.BatchProcessMetadata(operation.metadata)
        if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
            raise ValueError(f"Batch Process Failed: {metadata.state_message}")
        storage_client = storage.Client()
        print("Output files:")
        # One process per Input Document
        for process in list(metadata.individual_process_statuses):
            # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
            # The Cloud Storage API requires the bucket name and URI prefix separately
            matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
            if not matches:
                    "Could not parse output GCS destination:",
            output_bucket, output_prefix = matches.groups()
            # Get List of Document Objects from the Output Bucket
            output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)
            # Document AI may output multiple JSON files per source file
            for blob in output_blobs:
                # Document AI should only output JSON files to GCS
                if blob.content_type != "application/json":
                        f"Skipping non-supported file: {} - Mimetype: {blob.content_type}"
                # Download JSON File as bytes object and convert to Document Object
                print(f"Fetching {}")
                document = documentai.Document.from_json(
                    blob.download_as_bytes(), ignore_unknown_fields=True
                # For a full list of Document object attributes, please reference this page:
                # Read the text recognition output from the processor
                print("The document contains the following text:")