I have Python server and client in gRPC. After adding ClientInterceptor I can't receive messages from unary_stream
method from the server on client side. On the server side I can see that stream is over, all messages has been sent.
It's like client is not aware that there are no messages left and there's no need to wait. Adding wait_for_ready
didn't help, it just fails with the DEADLINE error. stream_stream
method works fine.
Calling response.code()
or response.result()
results infinite wait time.
Calling response.done()
returns false
, response.running()
returns true. My interceptor:
def _intercept_stream_call(self, continuation, client_call_details, request_or_iterator):
for try_i in range(self.max_attempts):
response = continuation(client_call_details, request_or_iterator)
if isinstance(response, _MultiThreadedRendezvous):
if response.code() == grpc.StatusCode.CANCELLED:
if try_i == (self.max_attempts - 1):
return response
else:
self.sleeping_policy.sleep(try_i)
else:
return list(response)
def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept_stream_call(continuation, client_call_details, request)
Code freezes on line if response.code() == grpc.StatusCode.CANCELLED:
. If I run that line in evaluator, same situation. The workaround is to call in evaluator list(response)
. After that, response.code()
will return the value.
Pls help to understand what am I doing wrong.
My server method:
def watchStream(self, request, context):
for i in range(3):
log.info(f'Returned response {i} to watch stream...')
yield social_media_stream_pb2.StreamUpdate(audio_chunk=social_media_stream_pb2.AudioChunk(audio_data=f'Audio{i}'.encode()),
video_frame=social_media_stream_pb2.VideoFrame(frame_data=f'Video{i}'.encode()))
I've tried to run list(response)
before running response.code()
or response.result()
. It helped, but I don't know why, it's just a workaround.
You're encountering a deadlock when calling response.code()
. From the docs, the return value of continuation
is
an object that is both a Call for the RPC and a Future
That means that the object is guaranteed to fulfill these two interfaces:
The return value is not however, guaranteed to be a response message. This is because continuation
returns not after the RPC response has been received, but instead immediately after the RPC has been sent. If you attempt to access the code on the RPC, the library will block waiting for your RPC to complete so that the code is set. But the library will not make progress on receiving the RPC because you've blocked the thread waiting on the code to be populated. Deadlock.
Instead, you should set up a callback to do something conditioned on the RPC status only once the RPC response has been received. The method to do this is add_done_callback
P.S. Please don't reference _MultiThreadedRendezvous
in your code. It's an internal API. You should be able to do everything without referencing it.