pythongoogle-cloud-platformapache-beamgoogle-natural-language

How can I handle an exception using apache beam Pipeline in Python?


I am getting an error in one of the iterations on the pipeline, given the Google NLP API returns an error when the language identified for the text isn't supported. I am having issues handling this exception within the pipeline to make use of the efficient bach request of apache beam.



features = nlp.types.AnnotateTextRequest.Features(
    extract_entities=True,
    extract_document_sentiment=True,
    extract_syntax=False
)

p = beam.Pipeline()
(p 
 | beam.Create(message_log_df['message'])
 | beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
 | nlp.AnnotateText(features)
 | beam.Map(parse_nlp_result)
 | beam.io.WriteToText('gs://{}/all.txt'.format(BUCKET_NAME), num_shards=1)
)
result = p.run()
result.wait_until_finish()

Exception I need to handle:

_InactiveRpcError                         Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
     66         try:
---> 67             return callable_(*args, **kwargs)
     68         except grpc.RpcError as exc:

/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    945                                       wait_for_ready, compression)
--> 946         return _end_unary_response_blocking(state, call, False, None)
    947 

/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
    848     else:
--> 849         raise _InactiveRpcError(state)
    850 

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "The language nl is not supported for entity analysis."
    debug_error_string = "{"created":"@1634485423.983876222","description":"Error received from peer ipv4:172.217.212.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"The language nl is not supported for entity analysis.","grpc_status":3}"
>

The above exception was the direct cause of the following exception:

InvalidArgument                           Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
    136   def process(self, element):
--> 137     response = self.client.annotate_text(
    138         document=Document.to_dict(element),

/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
    575         )
--> 576         return self._inner_api_calls["annotate_text"](
    577             request, retry=retry, timeout=timeout, metadata=metadata

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
    285             )
--> 286             return retry_target(
    287                 target,

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
    188         try:
--> 189             return target()
    190 

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
     68         except grpc.RpcError as exc:
---> 69             six.raise_from(exceptions.from_grpc_error(exc), exc)
     70 

/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)

InvalidArgument: 400 The language nl is not supported for entity analysis.

During handling of the above exception, another exception occurred:

InvalidArgument                           Traceback (most recent call last)
<ipython-input-121-8497e34517c1> in <module>
      7  | beam.io.WriteToText('gs://{}/whatsapp-all.txt'.format(BUCKET_NAME), num_shards=1)
      8 )
----> 9 result = p.run()
     10 result.wait_until_finish()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    563         finally:
    564           shutil.rmtree(tmpdir)
--> 565       return self.runner.run_pipeline(self, self._options)
    566     finally:
    567       shutil.rmtree(self.local_tempdir, ignore_errors=True)

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    129       runner = BundleBasedDirectRunner()
    130 
--> 131     return runner.run_pipeline(pipeline, options)
    132 
    133 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    193         options.view_as(pipeline_options.ProfilingOptions))
    194 
--> 195     self._latest_run_result = self.run_via_runner_api(
    196         pipeline.to_runner_api(default_environment=self._default_environment))
    197     return self._latest_run_result

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    204     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    205     #   the teststream (if any), and all the stages).
--> 206     return self.run_stages(stage_context, stages)
    207 
    208   @contextlib.contextmanager

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    382           )
    383 
--> 384           stage_results = self._run_stage(
    385               runner_execution_context, bundle_context_manager)
    386 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    644     while True:
    645       last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 646           self._run_bundle(
    647               runner_execution_context,
    648               bundle_context_manager,

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    767         expected_timer_output)
    768 
--> 769     result, splits = bundle_manager.process_bundle(
    770         data_input, data_output, input_timers, expected_timer_output)
    771     # Now we collect all the deferred inputs remaining from bundle execution.

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
   1078             process_bundle_descriptor.id,
   1079             cache_tokens=[next(self._cache_token_generator)]))
-> 1080     result_future = self._worker_handler.control_conn.push(process_bundle_req)
   1081 
   1082     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    376       self._uid_counter += 1
    377       request.instruction_id = 'control_%s' % self._uid_counter
--> 378     response = self.worker.do_instruction(request)
    379     return ControlFuture(request.instruction_id, response)
    380 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    600     if request_type:
    601       # E.g. if register is set, this will call self.register(request.register))
--> 602       return getattr(self, request_type)(
    603           getattr(request, request_type), request.instruction_id)
    604     else:

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    638         with self.maybe_profile(instruction_id):
    639           delayed_applications, requests_finalization = (
--> 640               bundle_processor.process_bundle(instruction_id))
    641           monitoring_infos = bundle_processor.monitoring_infos()
    642           monitoring_infos.extend(self.state_cache_metrics_fn())

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    994                   element.timer_family_id, timer_data)
    995           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 996             input_op_by_transform_id[element.transform_id].process_encoded(
    997                 element.data)
    998 

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    220       decoded_value = self.windowed_coder_impl.decode_from_stream(
    221           input_stream, True)
--> 222       self.output(decoded_value)
    223 
    224   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
    135 
    136   def process(self, element):
--> 137     response = self.client.annotate_text(
    138         document=Document.to_dict(element),
    139         features=self.features,

/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
    574             document=document, features=features, encoding_type=encoding_type
    575         )
--> 576         return self._inner_api_calls["annotate_text"](
    577             request, retry=retry, timeout=timeout, metadata=metadata
    578         )

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
    143             kwargs["metadata"] = metadata
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 
    147 

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
    284                 self._initial, self._maximum, multiplier=self._multiplier
    285             )
--> 286             return retry_target(
    287                 target,
    288                 self._predicate,

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
    187     for sleep in sleep_generator:
    188         try:
--> 189             return target()
    190 
    191         # pylint: disable=broad-except

/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
     67             return callable_(*args, **kwargs)
     68         except grpc.RpcError as exc:
---> 69             six.raise_from(exceptions.from_grpc_error(exc), exc)
     70 
     71     return error_remapped_callable

/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)

InvalidArgument: 400 The language nl is not supported for entity analysis. [while running '[121]: AnnotateText(extract_entities: true
extract_document_sentiment: true
)/ParDo(_AnnotateTextFn)']

Tutorial reference I used: https://medium.com/google-cloud/calling-google-cloud-machine-learning-apis-from-batch-and-stream-etl-pipelines-9a789ac6f972


Solution

  • I found a way around by going over some side of the GCP API calls from this file on Apache beam source code. So, the abstraction doesn't have any feature to handle errors from the API, and fail the whole Pipeline.

    A simple work around would be to override some key class/functions: the _AnnotateTextFn object that handles the API calls and the AnnotateText function that uses _AnnotateTextFn (make it use our custom version that handles the API exception)

    class _AnnotateTextFn_Custom(nlp._AnnotateTextFn):
    
        def __init__(
          self,
          features,  # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
          timeout,  # type: Optional[float]
          metadata=None  # type: Optional[Sequence[Tuple[str, str]]]
          ):
            super(_AnnotateTextFn_Safe, self).__init__(features, timeout, metadata)
    
        def process(self, element):
            try:
                response = self.client.annotate_text(
                    document=nlp.Document.to_dict(element),
                    features=self.features,
                    encoding_type=element.encoding,
                    timeout=self.timeout,
                    metadata=self.metadata)
            except Exception as e:
                # ****Handle the exception here****
                response = nlp.types.AnnotateTextResponse()
    
            self.api_calls.inc()
            
            yield response
    
    
    @beam.ptransform_fn
    def AnnotateText(
        pcoll,  # type: beam.pvalue.PCollection
        features,  # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
        timeout=None,  # type: Optional[float]
        metadata=None  # type: Optional[Sequence[Tuple[str, str]]]
    ):
      """A :class:`~apache_beam.transforms.ptransform.PTransform`
      for annotating text using the Google Cloud Natural Language API:
      https://cloud.google.com/natural-language/docs.
      Args:
        pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of
          :class:`Document` objects.
        features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`):
          A dictionary of natural language operations to be performed on given
          text in the following format::
          {'extact_syntax'=True, 'extract_entities'=True}
        timeout (`Optional[float]`): The amount of time, in seconds, to wait
          for the request to complete. The timeout applies to each individual
          retry attempt.
        metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata
          that is provided to the method.
      """
      return pcoll | beam.ParDo(_AnnotateTextFn_Custom(features, timeout, metadata)) # ***Here, we use our custom object _AnnotateTextFn_Custom
    

    Then we can use this new AnnotateText on the Pipeline