pythonpython-asynciogrpcgrpc-python

handling async streaming request in grpc python


I am trying to understand how to handle a grpc api with bidirectional streaming (using the Python API).

Say I have the following simple server definition:

syntax = "proto3";
package simple;

service TestService {
  rpc Translate(stream Msg) returns (stream Msg){}
}

message Msg
{
 string msg = 1;
}

Say that the messages that will be sent from the client come asynchronously ( as a consequence of user selecting some ui elements).

The generated python stub for the client will contain a method Translate that will accept a generator function and will return an iterator.

What is not clear to me is how would I write the generator function that will return messages as they are created by the user. Sleeping on the thread while waiting for messages doesn't sound like the best solution.


Solution

  • This is a bit clunky right now, but you can accomplish your use case as follows:

    #!/usr/bin/env python
    
    from __future__ import print_function
    
    import time
    import random
    import collections
    import threading
    
    from concurrent import futures
    from concurrent.futures import ThreadPoolExecutor
    import grpc
    
    from translate_pb2 import Msg
    from translate_pb2_grpc import TestServiceStub
    from translate_pb2_grpc import TestServiceServicer
    from translate_pb2_grpc import add_TestServiceServicer_to_server
    
    
    def translate_next(msg):
        return ''.join(reversed(msg))
    
    
    class Translator(TestServiceServicer):
      def Translate(self, request_iterator, context):
        for req in request_iterator:
          print("Translating message: {}".format(req.msg))
          yield Msg(msg=translate_next(req.msg))
    
    class TranslatorClient(object):
      def __init__(self):
        self._stop_event = threading.Event()
        self._request_condition = threading.Condition()
        self._response_condition = threading.Condition()
        self._requests = collections.deque()
        self._last_request = None
        self._expected_responses = collections.deque()
        self._responses = {}
    
      def _next(self):
        with self._request_condition:
          while not self._requests and not self._stop_event.is_set():
            self._request_condition.wait()
          if len(self._requests) > 0:
            return self._requests.popleft()
          else:
            raise StopIteration()
    
      def next(self):
        return self._next()
    
      def __next__(self):
        return self._next()
    
      def add_response(self, response):
        with self._response_condition:
          request = self._expected_responses.popleft()
          self._responses[request] = response
          self._response_condition.notify_all()
    
      def add_request(self, request):
        with self._request_condition:
          self._requests.append(request)
          with self._response_condition:
            self._expected_responses.append(request.msg)
          self._request_condition.notify()
    
      def close(self):
        self._stop_event.set()
        with self._request_condition:
          self._request_condition.notify()
    
      def translate(self, to_translate):
        self.add_request(to_translate)
        with self._response_condition:
          while True:
            self._response_condition.wait()
            if to_translate.msg in self._responses:
              return self._responses[to_translate.msg]
    
    
    def _run_client(address, translator_client):
      with grpc.insecure_channel('localhost:50054') as channel:
        stub = TestServiceStub(channel)
        responses = stub.Translate(translator_client)
        for resp in responses:
          translator_client.add_response(resp)
    
    def main():
      server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
      add_TestServiceServicer_to_server(Translator(), server)
      server.add_insecure_port('[::]:50054')
      server.start()
      translator_client = TranslatorClient()
      client_thread = threading.Thread(
          target=_run_client, args=('localhost:50054', translator_client))
      client_thread.start()
    
      def _translate(to_translate):
        return translator_client.translate(Msg(msg=to_translate)).msg
    
      translator_pool = futures.ThreadPoolExecutor(max_workers=4)
      to_translate = ("hello", "goodbye", "I", "don't", "know", "why",)
      translations = translator_pool.map(_translate, to_translate)
      print("Translations: {}".format(zip(to_translate, translations)))
    
      translator_client.close()
      client_thread.join()
      server.stop(None)
    
    
    if __name__ == "__main__":
      main()
    
    

    The basic idea is to have an object called TranslatorClient running on a separate thread, correlating requests and responses. It expects that responses will return in the order that requests were sent out. It also implements the iterator interface so that you can pass it directly to an invocation of the Translate method on your stub.

    We spin up a thread running _run_client which pulls responses out of TranslatorClient and feeds them back in the other end with add_response.

    The main function I included here is really just a strawman since I don't have the particulars of your UI code. I'm running _translate in a ThreadPoolExecutor to demonstrate that, even though translator_client.translate is synchronous, it yields, allowing you to have multiple in-flight requests at once.

    We recognize that this is a lot of code to write for such a simple use case. Ultimately, the answer will be asyncio support. We have plans for this in the not-too-distant future. But for the moment, this sort of solution should keep you going whether you're running python 2 or python 3.