I am getting an error in one of the iterations on the pipeline, given the Google NLP API returns an error when the language identified for the text isn't supported. I am having issues handling this exception within the pipeline to make use of the efficient bach request of apache beam.
features = nlp.types.AnnotateTextRequest.Features(
extract_entities=True,
extract_document_sentiment=True,
extract_syntax=False
)
p = beam.Pipeline()
(p
| beam.Create(message_log_df['message'])
| beam.Map(lambda x : nlp.Document(x, type='PLAIN_TEXT'))
| nlp.AnnotateText(features)
| beam.Map(parse_nlp_result)
| beam.io.WriteToText('gs://{}/all.txt'.format(BUCKET_NAME), num_shards=1)
)
result = p.run()
result.wait_until_finish()
Exception I need to handle:
_InactiveRpcError Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
66 try:
---> 67 return callable_(*args, **kwargs)
68 except grpc.RpcError as exc:
/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
945 wait_for_ready, compression)
--> 946 return _end_unary_response_blocking(state, call, False, None)
947
/opt/conda/miniconda3/lib/python3.8/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
848 else:
--> 849 raise _InactiveRpcError(state)
850
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "The language nl is not supported for entity analysis."
debug_error_string = "{"created":"@1634485423.983876222","description":"Error received from peer ipv4:172.217.212.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"The language nl is not supported for entity analysis.","grpc_status":3}"
>
The above exception was the direct cause of the following exception:
InvalidArgument Traceback (most recent call last)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
136 def process(self, element):
--> 137 response = self.client.annotate_text(
138 document=Document.to_dict(element),
/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
575 )
--> 576 return self._inner_api_calls["annotate_text"](
577 request, retry=retry, timeout=timeout, metadata=metadata
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
144
--> 145 return wrapped_func(*args, **kwargs)
146
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
285 )
--> 286 return retry_target(
287 target,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
188 try:
--> 189 return target()
190
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
68 except grpc.RpcError as exc:
---> 69 six.raise_from(exceptions.from_grpc_error(exc), exc)
70
/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)
InvalidArgument: 400 The language nl is not supported for entity analysis.
During handling of the above exception, another exception occurred:
InvalidArgument Traceback (most recent call last)
<ipython-input-121-8497e34517c1> in <module>
7 | beam.io.WriteToText('gs://{}/whatsapp-all.txt'.format(BUCKET_NAME), num_shards=1)
8 )
----> 9 result = p.run()
10 result.wait_until_finish()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
563 finally:
564 shutil.rmtree(tmpdir)
--> 565 return self.runner.run_pipeline(self, self._options)
566 finally:
567 shutil.rmtree(self.local_tempdir, ignore_errors=True)
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
193 options.view_as(pipeline_options.ProfilingOptions))
194
--> 195 self._latest_run_result = self.run_via_runner_api(
196 pipeline.to_runner_api(default_environment=self._default_environment))
197 return self._latest_run_result
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
204 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
205 # the teststream (if any), and all the stages).
--> 206 return self.run_stages(stage_context, stages)
207
208 @contextlib.contextmanager
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
382 )
383
--> 384 stage_results = self._run_stage(
385 runner_execution_context, bundle_context_manager)
386
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
644 while True:
645 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 646 self._run_bundle(
647 runner_execution_context,
648 bundle_context_manager,
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
767 expected_timer_output)
768
--> 769 result, splits = bundle_manager.process_bundle(
770 data_input, data_output, input_timers, expected_timer_output)
771 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1078 process_bundle_descriptor.id,
1079 cache_tokens=[next(self._cache_token_generator)]))
-> 1080 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1081
1082 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
600 if request_type:
601 # E.g. if register is set, this will call self.register(request.register))
--> 602 return getattr(self, request_type)(
603 getattr(request, request_type), request.instruction_id)
604 else:
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
638 with self.maybe_profile(instruction_id):
639 delayed_applications, requests_finalization = (
--> 640 bundle_processor.process_bundle(instruction_id))
641 monitoring_infos = bundle_processor.monitoring_infos()
642 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
994 element.timer_family_id, timer_data)
995 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 996 input_op_by_transform_id[element.transform_id].process_encoded(
997 element.data)
998
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
220 decoded_value = self.windowed_coder_impl.decode_from_stream(
221 input_stream, True)
--> 222 self.output(decoded_value)
223
224 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/miniconda3/lib/python3.8/site-packages/apache_beam/ml/gcp/naturallanguageml.py in process(self, element)
135
136 def process(self, element):
--> 137 response = self.client.annotate_text(
138 document=Document.to_dict(element),
139 features=self.features,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/cloud/language_v1/gapic/language_service_client.py in annotate_text(self, document, features, encoding_type, retry, timeout, metadata)
574 document=document, features=features, encoding_type=encoding_type
575 )
--> 576 return self._inner_api_calls["annotate_text"](
577 request, retry=retry, timeout=timeout, metadata=metadata
578 )
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__(self, *args, **kwargs)
143 kwargs["metadata"] = metadata
144
--> 145 return wrapped_func(*args, **kwargs)
146
147
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func(*args, **kwargs)
284 self._initial, self._maximum, multiplier=self._multiplier
285 )
--> 286 return retry_target(
287 target,
288 self._predicate,
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/retry.py in retry_target(target, predicate, sleep_generator, deadline, on_error)
187 for sleep in sleep_generator:
188 try:
--> 189 return target()
190
191 # pylint: disable=broad-except
/opt/conda/miniconda3/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable(*args, **kwargs)
67 return callable_(*args, **kwargs)
68 except grpc.RpcError as exc:
---> 69 six.raise_from(exceptions.from_grpc_error(exc), exc)
70
71 return error_remapped_callable
/opt/conda/miniconda3/lib/python3.8/site-packages/six.py in raise_from(value, from_value)
InvalidArgument: 400 The language nl is not supported for entity analysis. [while running '[121]: AnnotateText(extract_entities: true
extract_document_sentiment: true
)/ParDo(_AnnotateTextFn)']
Tutorial reference I used: https://medium.com/google-cloud/calling-google-cloud-machine-learning-apis-from-batch-and-stream-etl-pipelines-9a789ac6f972
I found a way around by going over some side of the GCP API calls from this file on Apache beam source code. So, the abstraction doesn't have any feature to handle errors from the API, and fail the whole Pipeline.
A simple work around would be to override some key class/functions: the _AnnotateTextFn object that handles the API calls and the AnnotateText function that uses _AnnotateTextFn (make it use our custom version that handles the API exception)
class _AnnotateTextFn_Custom(nlp._AnnotateTextFn):
def __init__(
self,
features, # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
timeout, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
super(_AnnotateTextFn_Safe, self).__init__(features, timeout, metadata)
def process(self, element):
try:
response = self.client.annotate_text(
document=nlp.Document.to_dict(element),
features=self.features,
encoding_type=element.encoding,
timeout=self.timeout,
metadata=self.metadata)
except Exception as e:
# ****Handle the exception here****
response = nlp.types.AnnotateTextResponse()
self.api_calls.inc()
yield response
@beam.ptransform_fn
def AnnotateText(
pcoll, # type: beam.pvalue.PCollection
features, # type: Union[Mapping[str, bool], types.AnnotateTextRequest.Features]
timeout=None, # type: Optional[float]
metadata=None # type: Optional[Sequence[Tuple[str, str]]]
):
"""A :class:`~apache_beam.transforms.ptransform.PTransform`
for annotating text using the Google Cloud Natural Language API:
https://cloud.google.com/natural-language/docs.
Args:
pcoll (:class:`~apache_beam.pvalue.PCollection`): An input PCollection of
:class:`Document` objects.
features (`Union[Mapping[str, bool], types.AnnotateTextRequest.Features]`):
A dictionary of natural language operations to be performed on given
text in the following format::
{'extact_syntax'=True, 'extract_entities'=True}
timeout (`Optional[float]`): The amount of time, in seconds, to wait
for the request to complete. The timeout applies to each individual
retry attempt.
metadata (`Optional[Sequence[Tuple[str, str]]]`): Additional metadata
that is provided to the method.
"""
return pcoll | beam.ParDo(_AnnotateTextFn_Custom(features, timeout, metadata)) # ***Here, we use our custom object _AnnotateTextFn_Custom
Then we can use this new AnnotateText on the Pipeline