google-workflows

Google Workflows - Unexpected KeyError exception


We are experiencing occasional KeyError exceptions when processing a large volume of images using Google Workflows. Our workflow is designed to process images stored in Google Cloud Storage. It is triggered by Eventarc events and utilizes Google Cloud Run services for image processing tasks. Under normal conditions, the workflow operates smoothly, handling the processing load effectively. However, when the number of images to be processed simultaneously increases significantly, we occasionally encounter the following error:

KeyError: key not found: code

Despite this error, the Cloud Run services involved in the workflow do not report any issues. Additionally, the images, despite the error message, are processed successfully. This lack of correlating errors from Cloud Run services and successful image processing makes it challenging to pinpoint the root cause of the KeyError exception.

Has anyone else encountered the same issue? Any insights or suggestions would be greatly appreciated.

We also checked that the resources were not exhausted and we were unable to determine the root cause of the issue.

EDITED, to add the code of the flow

{
  "main": {
    "params": ["event"],
    "steps": [
      {
        "step_init": {
          "assign": [
            {
              "flowResults": {}
            },
            {
              "errors": []
            }
          ],
          "next": "step_checkEventPayload"
        }
      },
      {
        "step_checkEventPayload": {
          "switch": [
            {
              "condition": "${(event.data.protoPayload.resourceName != null) and (event.data.resource.labels.bucket_name != null)}",
              "assign": [
                {
                  "resourceName": "${event.data.protoPayload.resourceName}"
                },
                {
                  "bucketName": "${event.data.resource.labels.bucket_name}"
                }
              ],
              "next": "step_filterProcessedImage"
            }
          ],
          "next": "end"
        }
      },
      {
        "step_filterProcessedImage": {
          "try": {
            "call": "http.post",
            "args": {
              "url": "A",
              "body": {
                "resourceName": "${resourceName}",
                "bucketName": "${bucketName}"
              },
              "auth": {
                "type": "OIDC"
              }
            },
            "result": "filterProcessedImageResponse"
          },
          "retry": {
            "predicate": "${retryPredicate}",
            "max_retries": 5,
            "backoff": {
              "initial_delay": 2,
              "max_delay": 60,
              "multiplier": 2
            }
          },
          "except": {
            "as": "filterProcessedImageException",
            "steps": [
              {
                "step_returnOnfilterProcessedImageException": {
                  "switch": [
                    {
                      "condition": "${(filterProcessedImageException.code != null) and (filterProcessedImageException.code == 409)}",
                      "return": "${\"Image \" + resourceName + \" already processed\"}"
                    }
                  ],
                  "next": "step_assignFilterProcessedImageException"
                }
              },
              {
                "step_assignFilterProcessedImageException": {
                  "assign": [
                    {
                      "filterProcessedImageResponse": "${filterProcessedImageException}"
                    },
                    {
                      "error": {
                        "service": "filter-processed-image-service",
                        "exception": "${filterProcessedImageException}"
                      }
                    },
                    {
                      "errors": "${list.concat(errors, error)}"
                    }
                  ]
                }
              }
            ]
          },
          "next": "step_assignFilterProcessedImageResult"
        }
      },
      {
        "step_assignFilterProcessedImageResult": {
          "assign": [
            {
              "flowResults[\"filterProcessedImageResponse\"]": "${filterProcessedImageResponse}"
            }
          ],
          "next": "step_checkResultsBeforeCreateCopyImages"
        }
      },
      {
        "step_checkResultsBeforeCreateCopyImages": {
          "switch": [
            {
              "condition": "${len(errors) != 0}",
              "next": "step_logErrors"
            }
          ],
          "next": "step_createCopyImages"
        }
      },
      {
        "step_createCopyImages": {
          "parallel": {
            "shared": ["flowResults", "errors"],
            "branches": [
              {
                "branch_createIntranetCopyImage": {
                  "steps": [
                    {
                      "step_createIntranetCopyImage": {
                        "try": {
                          "call": "http.post",
                          "args": {
                            "url": "B",
                            "body": {
                              "resourceName": "${resourceName}",
                              "bucketName": "${bucketName}"
                            },
                            "auth": {
                              "type": "OIDC"
                            }
                          },
                          "result": "createIntranetCopyImageResponse"
                        },
                        "retry": {
                          "predicate": "${retryPredicate}",
                          "max_retries": 5,
                          "backoff": {
                            "initial_delay": 2,
                            "max_delay": 60,
                            "multiplier": 2
                          }
                        },
                        "except": {
                          "as": "createIntranetCopyImageException",
                          "steps": [
                            {
                              "step_assignCreateIntranetCopyImageException": {
                                "assign": [
                                  {
                                    "createIntranetCopyImageResponse": "${createIntranetCopyImageException}"
                                  },
                                  {
                                    "error": {
                                      "service": "create-intranet-copy-image-service",
                                      "exception": "${createIntranetCopyImageException}"
                                    }
                                  },
                                  {
                                    "errors": "${list.concat(errors, error)}"
                                  }
                                ]
                              }
                            }
                          ]
                        },
                        "next": "step_assignCreateIntranetCopyImageResult"
                      }
                    },
                    {
                      "step_assignCreateIntranetCopyImageResult": {
                        "assign": [
                          {
                            "flowResults[\"createIntranetCopyImageResponse\"]": "${createIntranetCopyImageResponse}"
                          }
                        ]
                      }
                    }
                  ]
                }
              },
              {
                "branch_createWebCopyImage": {
                  "steps": [
                    {
                      "step_createWebCopyImage": {
                        "try": {
                          "call": "http.post",
                          "args": {
                            "url": "C",
                            "body": {
                              "resourceName": "${resourceName}",
                              "bucketName": "${bucketName}"
                            },
                            "auth": {
                              "type": "OIDC"
                            }
                          },
                          "result": "createWebCopyImageResponse"
                        },
                        "retry": {
                          "predicate": "${retryPredicate}",
                          "max_retries": 5,
                          "backoff": {
                            "initial_delay": 2,
                            "max_delay": 60,
                            "multiplier": 2
                          }
                        },
                        "except": {
                          "as": "createWebCopyImageException",
                          "steps": [
                            {
                              "step_assignCreateWebCopyImageException": {
                                "assign": [
                                  {
                                    "createWebCopyImageResponse": "${createWebCopyImageException}"
                                  },
                                  {
                                    "error": {
                                      "service": "create-web-copy-image-service",
                                      "exception": "${createWebCopyImageException}"
                                    }
                                  },
                                  {
                                    "errors": "${list.concat(errors, error)}"
                                  }
                                ]
                              }
                            }
                          ]
                        },
                        "next": "step_assignCreateWebCopyImageResult"
                      }
                    },
                    {
                      "step_assignCreateWebCopyImageResult": {
                        "assign": [
                          {
                            "flowResults[\"createWebCopyImageResponse\"]": "${createWebCopyImageResponse}"
                          }
                        ]
                      }
                    }
                  ]
                }
              }
            ]
          },
          "next": "step_checkResultsAfterCreateCopyImages"
        }
      },
      {
        "step_checkResultsAfterCreateCopyImages": {
          "switch": [
            {
              "condition": "${len(errors) != 0}",
              "next": "step_logErrors"
            }
          ],
          "next": "step_detectText"
        }
      },
      {
        "step_detectText": {
          "parallel": {
            "shared": ["flowResults", "errors"],
            "branches": [
              {
                "branch_createSinglePageTxt": {
                  "steps": [
                    {
                      "step_createSinglePageTxt": {
                        "try": {
                          "call": "http.post",
                          "args": {
                            "url": "D",
                            "body": {
                              "resourceName": "${resourceName}",
                              "bucketName": "${bucketName}"
                            },
                            "auth": {
                              "type": "OIDC"
                            }
                          },
                          "result": "createSinglePageTxtResponse"
                        },
                        "retry": {
                          "predicate": "${retryPredicate}",
                          "max_retries": 5,
                          "backoff": {
                            "initial_delay": 2,
                            "max_delay": 60,
                            "multiplier": 2
                          }
                        },
                        "except": {
                          "as": "createSinglePageTxtException",
                          "steps": [
                            {
                              "step_assignCreateSinglePageTxtException": {
                                "assign": [
                                  {
                                    "createSinglePageTxtResponse": "${createSinglePageTxtException}"
                                  },
                                  {
                                    "error": {
                                      "service": "create-single-page-txt-service",
                                      "exception": "${createSinglePageTxtException}"
                                    }
                                  },
                                  {
                                    "errors": "${list.concat(errors, error)}"
                                  }
                                ]
                              }
                            }
                          ]
                        },
                        "next": "step_assignCreateSinglePageTxtResult"
                      }
                    },
                    {
                      "step_assignCreateSinglePageTxtResult": {
                        "assign": [
                          {
                            "flowResults[\"createSinglePageTxtResponse\"]": "${createSinglePageTxtResponse}"
                          }
                        ]
                      }
                    }
                  ]
                }
              },
              {
                "branch_createSinglePageSearchablePdf": {
                  "steps": [
                    {
                      "step_createSinglePageSearchablePdf": {
                        "try": {
                          "call": "http.post",
                          "args": {
                            "url": "F",
                            "body": {
                              "resourceName": "${resourceName}",
                              "bucketName": "${bucketName}"
                            },
                            "auth": {
                              "type": "OIDC"
                            }
                          },
                          "result": "createSinglePageSearchablePdfResponse"
                        },
                        "retry": {
                          "predicate": "${retryPredicate}",
                          "max_retries": 5,
                          "backoff": {
                            "initial_delay": 2,
                            "max_delay": 128,
                            "multiplier": 4
                          }
                        },
                        "except": {
                          "as": "createSinglePageSearchablePdfException",
                          "steps": [
                            {
                              "step_assignCreateSinglePageSearchablePdfException": {
                                "assign": [
                                  {
                                    "createSinglePageSearchablePdfResponse": "${createSinglePageSearchablePdfException}"
                                  },
                                  {
                                    "error": {
                                      "service": "create-single-page-searchable-pdf-service",
                                      "exception": "${createSinglePageSearchablePdfException}"
                                    }
                                  },
                                  {
                                    "errors": "${list.concat(errors, error)}"
                                  }
                                ]
                              }
                            }
                          ]
                        },
                        "next": "step_assignCreateSinglePageSearchablePdfResult"
                      }
                    },
                    {
                      "step_assignCreateSinglePageSearchablePdfResult": {
                        "assign": [
                          {
                            "flowResults[\"createSinglePageSearchablePdfResponse\"]": "${createSinglePageSearchablePdfResponse}"
                          }
                        ]
                      }
                    }
                  ]
                }
              }
            ]
          },
          "next": "step_checkResultsBeforeUpdateProcessingMetadata"
        }
      },
      {
        "step_checkResultsBeforeUpdateProcessingMetadata": {
          "switch": [
            {
              "condition": "${len(errors) != 0}",
              "next": "step_logErrors"
            }
          ],
          "next": "step_updateProcessingMetadata"
        }
      },
      {
        "step_updateProcessingMetadata": {
          "try": {
            "call": "http.post",
            "args": {
              "url": "G",
              "body": {
                "resourceName": "${resourceName}",
                "bucketName": "${bucketName}"
              },
              "auth": {
                "type": "OIDC"
              }
            },
            "result": "updateProcessingMetadataResponse"
          },
          "retry": {
            "predicate": "${retryPredicate}",
            "max_retries": 5,
            "backoff": {
              "initial_delay": 2,
              "max_delay": 60,
              "multiplier": 2
            }
          },
          "except": {
            "as": "updateProcessingMetadataException",
            "steps": [
              {
                "step_assignUpdateProcessingMetadataException": {
                  "assign": [
                    {
                      "updateProcessingMetadataResponse": "${updateProcessingMetadataException}"
                    },
                    {
                      "error": {
                        "service": "update-processing-metadata-service",
                        "exception": "${updateProcessingMetadataException}"
                      }
                    },
                    {
                      "errors": "${list.concat(errors, error)}"
                    }
                  ]
                }
              }
            ]
          },
          "next": "step_assignUpdateProcessingMetadataResult"
        }
      },
      {
        "step_assignUpdateProcessingMetadataResult": {
          "assign": [
            {
              "flowResults[\"updateProcessingMetadataResponse\"]": "${updateProcessingMetadataResponse}"
            }
          ],
          "next": "step_checkResultsAfterUpdateProcessingMetadata"
        }
      },
      {
        "step_checkResultsAfterUpdateProcessingMetadata": {
          "switch": [
            {
              "condition": "${len(errors) != 0}",
              "next": "step_logErrors"
            }
          ],
          "next": "step_returnFlowResults"
        }
      },
      {
        "step_logErrors": {
          "try": {
            "call": "http.post",
            "args": {
              "url": "H",
              "body": {
                "resourceName": "${resourceName}",
                "bucketName": "${bucketName}",
                "errors": "${errors}"
              },
              "auth": {
                "type": "OIDC"
              }
            },
            "result": "logErrorsResponse"
          },
          "retry": {
            "predicate": "${retryPredicate}",
            "max_retries": 5,
            "backoff": {
              "initial_delay": 2,
              "max_delay": 60,
              "multiplier": 2
            }
          },
          "except": {
            "as": "logErrorsException",
            "steps": [
              {
                "step_assignLogErrorsException": {
                  "assign": [
                    {
                      "logErrorsResponse": "${logErrorsException}"
                    }
                  ]
                }
              }
            ]
          },
          "next": "step_returnFlowResults"
        }
      },
      {
        "step_returnFlowResults": {
          "return": "${flowResults}"
        }
      }
    ]
  },
  "retryPredicate": {
    "params": ["error"],
    "steps": [
      {
        "step_repeatWhen": {
          "switch": [
            {
              "condition": "${(error.code != null) and (error.code == 500)}",
              "return": true
            },
            {
              "condition": "${(error.code != null) and (error.code == 503)}",
              "return": true
            },
            {
              "condition": "${(error.code != null) and (error.code == 429)}",
              "return": true
            }
          ]
        }
      },
      {
        "step_otherwise": {
          "return": false
        }
      }
    ]
  }
}


Solution

  • As you figured out the problem was a "connection broken" error and therefor your custom retry policy failed, let me share my custom retry policy:

    custom_retry:
      params: [e]
      steps:
        - retry_always:
            return: true
    

    It looks stupid but it works :) As in the retry I define max_retries it's ok if it gets retried every time it fails (till max_retries reached). I haven't found a more elegant solution than this.