We are experiencing occasional KeyError exceptions when processing a large volume of images using Google Workflows. Our workflow is designed to process images stored in Google Cloud Storage. It is triggered by Eventarc events and utilizes Google Cloud Run services for image processing tasks. Under normal conditions, the workflow operates smoothly, handling the processing load effectively. However, when the number of images to be processed simultaneously increases significantly, we occasionally encounter the following error:
KeyError: key not found: code
Despite this error, the Cloud Run services involved in the workflow do not report any issues. Additionally, the images, despite the error message, are processed successfully. This lack of correlating errors from Cloud Run services and successful image processing makes it challenging to pinpoint the root cause of the KeyError exception.
Has anyone else encountered the same issue? Any insights or suggestions would be greatly appreciated.
We also checked that the resources were not exhausted and we were unable to determine the root cause of the issue.
EDITED, to add the code of the flow
{
"main": {
"params": ["event"],
"steps": [
{
"step_init": {
"assign": [
{
"flowResults": {}
},
{
"errors": []
}
],
"next": "step_checkEventPayload"
}
},
{
"step_checkEventPayload": {
"switch": [
{
"condition": "${(event.data.protoPayload.resourceName != null) and (event.data.resource.labels.bucket_name != null)}",
"assign": [
{
"resourceName": "${event.data.protoPayload.resourceName}"
},
{
"bucketName": "${event.data.resource.labels.bucket_name}"
}
],
"next": "step_filterProcessedImage"
}
],
"next": "end"
}
},
{
"step_filterProcessedImage": {
"try": {
"call": "http.post",
"args": {
"url": "A",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}"
},
"auth": {
"type": "OIDC"
}
},
"result": "filterProcessedImageResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
},
"except": {
"as": "filterProcessedImageException",
"steps": [
{
"step_returnOnfilterProcessedImageException": {
"switch": [
{
"condition": "${(filterProcessedImageException.code != null) and (filterProcessedImageException.code == 409)}",
"return": "${\"Image \" + resourceName + \" already processed\"}"
}
],
"next": "step_assignFilterProcessedImageException"
}
},
{
"step_assignFilterProcessedImageException": {
"assign": [
{
"filterProcessedImageResponse": "${filterProcessedImageException}"
},
{
"error": {
"service": "filter-processed-image-service",
"exception": "${filterProcessedImageException}"
}
},
{
"errors": "${list.concat(errors, error)}"
}
]
}
}
]
},
"next": "step_assignFilterProcessedImageResult"
}
},
{
"step_assignFilterProcessedImageResult": {
"assign": [
{
"flowResults[\"filterProcessedImageResponse\"]": "${filterProcessedImageResponse}"
}
],
"next": "step_checkResultsBeforeCreateCopyImages"
}
},
{
"step_checkResultsBeforeCreateCopyImages": {
"switch": [
{
"condition": "${len(errors) != 0}",
"next": "step_logErrors"
}
],
"next": "step_createCopyImages"
}
},
{
"step_createCopyImages": {
"parallel": {
"shared": ["flowResults", "errors"],
"branches": [
{
"branch_createIntranetCopyImage": {
"steps": [
{
"step_createIntranetCopyImage": {
"try": {
"call": "http.post",
"args": {
"url": "B",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}"
},
"auth": {
"type": "OIDC"
}
},
"result": "createIntranetCopyImageResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
},
"except": {
"as": "createIntranetCopyImageException",
"steps": [
{
"step_assignCreateIntranetCopyImageException": {
"assign": [
{
"createIntranetCopyImageResponse": "${createIntranetCopyImageException}"
},
{
"error": {
"service": "create-intranet-copy-image-service",
"exception": "${createIntranetCopyImageException}"
}
},
{
"errors": "${list.concat(errors, error)}"
}
]
}
}
]
},
"next": "step_assignCreateIntranetCopyImageResult"
}
},
{
"step_assignCreateIntranetCopyImageResult": {
"assign": [
{
"flowResults[\"createIntranetCopyImageResponse\"]": "${createIntranetCopyImageResponse}"
}
]
}
}
]
}
},
{
"branch_createWebCopyImage": {
"steps": [
{
"step_createWebCopyImage": {
"try": {
"call": "http.post",
"args": {
"url": "C",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}"
},
"auth": {
"type": "OIDC"
}
},
"result": "createWebCopyImageResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
},
"except": {
"as": "createWebCopyImageException",
"steps": [
{
"step_assignCreateWebCopyImageException": {
"assign": [
{
"createWebCopyImageResponse": "${createWebCopyImageException}"
},
{
"error": {
"service": "create-web-copy-image-service",
"exception": "${createWebCopyImageException}"
}
},
{
"errors": "${list.concat(errors, error)}"
}
]
}
}
]
},
"next": "step_assignCreateWebCopyImageResult"
}
},
{
"step_assignCreateWebCopyImageResult": {
"assign": [
{
"flowResults[\"createWebCopyImageResponse\"]": "${createWebCopyImageResponse}"
}
]
}
}
]
}
}
]
},
"next": "step_checkResultsAfterCreateCopyImages"
}
},
{
"step_checkResultsAfterCreateCopyImages": {
"switch": [
{
"condition": "${len(errors) != 0}",
"next": "step_logErrors"
}
],
"next": "step_detectText"
}
},
{
"step_detectText": {
"parallel": {
"shared": ["flowResults", "errors"],
"branches": [
{
"branch_createSinglePageTxt": {
"steps": [
{
"step_createSinglePageTxt": {
"try": {
"call": "http.post",
"args": {
"url": "D",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}"
},
"auth": {
"type": "OIDC"
}
},
"result": "createSinglePageTxtResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
},
"except": {
"as": "createSinglePageTxtException",
"steps": [
{
"step_assignCreateSinglePageTxtException": {
"assign": [
{
"createSinglePageTxtResponse": "${createSinglePageTxtException}"
},
{
"error": {
"service": "create-single-page-txt-service",
"exception": "${createSinglePageTxtException}"
}
},
{
"errors": "${list.concat(errors, error)}"
}
]
}
}
]
},
"next": "step_assignCreateSinglePageTxtResult"
}
},
{
"step_assignCreateSinglePageTxtResult": {
"assign": [
{
"flowResults[\"createSinglePageTxtResponse\"]": "${createSinglePageTxtResponse}"
}
]
}
}
]
}
},
{
"branch_createSinglePageSearchablePdf": {
"steps": [
{
"step_createSinglePageSearchablePdf": {
"try": {
"call": "http.post",
"args": {
"url": "F",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}"
},
"auth": {
"type": "OIDC"
}
},
"result": "createSinglePageSearchablePdfResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 128,
"multiplier": 4
}
},
"except": {
"as": "createSinglePageSearchablePdfException",
"steps": [
{
"step_assignCreateSinglePageSearchablePdfException": {
"assign": [
{
"createSinglePageSearchablePdfResponse": "${createSinglePageSearchablePdfException}"
},
{
"error": {
"service": "create-single-page-searchable-pdf-service",
"exception": "${createSinglePageSearchablePdfException}"
}
},
{
"errors": "${list.concat(errors, error)}"
}
]
}
}
]
},
"next": "step_assignCreateSinglePageSearchablePdfResult"
}
},
{
"step_assignCreateSinglePageSearchablePdfResult": {
"assign": [
{
"flowResults[\"createSinglePageSearchablePdfResponse\"]": "${createSinglePageSearchablePdfResponse}"
}
]
}
}
]
}
}
]
},
"next": "step_checkResultsBeforeUpdateProcessingMetadata"
}
},
{
"step_checkResultsBeforeUpdateProcessingMetadata": {
"switch": [
{
"condition": "${len(errors) != 0}",
"next": "step_logErrors"
}
],
"next": "step_updateProcessingMetadata"
}
},
{
"step_updateProcessingMetadata": {
"try": {
"call": "http.post",
"args": {
"url": "G",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}"
},
"auth": {
"type": "OIDC"
}
},
"result": "updateProcessingMetadataResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
},
"except": {
"as": "updateProcessingMetadataException",
"steps": [
{
"step_assignUpdateProcessingMetadataException": {
"assign": [
{
"updateProcessingMetadataResponse": "${updateProcessingMetadataException}"
},
{
"error": {
"service": "update-processing-metadata-service",
"exception": "${updateProcessingMetadataException}"
}
},
{
"errors": "${list.concat(errors, error)}"
}
]
}
}
]
},
"next": "step_assignUpdateProcessingMetadataResult"
}
},
{
"step_assignUpdateProcessingMetadataResult": {
"assign": [
{
"flowResults[\"updateProcessingMetadataResponse\"]": "${updateProcessingMetadataResponse}"
}
],
"next": "step_checkResultsAfterUpdateProcessingMetadata"
}
},
{
"step_checkResultsAfterUpdateProcessingMetadata": {
"switch": [
{
"condition": "${len(errors) != 0}",
"next": "step_logErrors"
}
],
"next": "step_returnFlowResults"
}
},
{
"step_logErrors": {
"try": {
"call": "http.post",
"args": {
"url": "H",
"body": {
"resourceName": "${resourceName}",
"bucketName": "${bucketName}",
"errors": "${errors}"
},
"auth": {
"type": "OIDC"
}
},
"result": "logErrorsResponse"
},
"retry": {
"predicate": "${retryPredicate}",
"max_retries": 5,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
},
"except": {
"as": "logErrorsException",
"steps": [
{
"step_assignLogErrorsException": {
"assign": [
{
"logErrorsResponse": "${logErrorsException}"
}
]
}
}
]
},
"next": "step_returnFlowResults"
}
},
{
"step_returnFlowResults": {
"return": "${flowResults}"
}
}
]
},
"retryPredicate": {
"params": ["error"],
"steps": [
{
"step_repeatWhen": {
"switch": [
{
"condition": "${(error.code != null) and (error.code == 500)}",
"return": true
},
{
"condition": "${(error.code != null) and (error.code == 503)}",
"return": true
},
{
"condition": "${(error.code != null) and (error.code == 429)}",
"return": true
}
]
}
},
{
"step_otherwise": {
"return": false
}
}
]
}
}
As you figured out the problem was a "connection broken" error and therefor your custom retry policy failed, let me share my custom retry policy:
custom_retry:
params: [e]
steps:
- retry_always:
return: true
It looks stupid but it works :) As in the retry I define max_retries it's ok if it gets retried every time it fails (till max_retries reached). I haven't found a more elegant solution than this.