pythonparallel-processingmultiprocessingpython-multiprocessingpsychopy

Using OpenCV and multiprocessing, how can I feed information from a variable in one Python script to another?


I am trying to have 2 scripts running in parallel and feeding one with the other.

First I trained a model to decode different gestures. I followed the tutorial right here: https://www.youtube.com/watch?v=yqkISICHH-U

That script opens the webcam and decodes the gestures I am doing, and create a new variable when the same movement is decoded 3 consecutive times (called mvt_ok). At that time I wish to send the information to another script that will be an experimental task develloped on psychopy (a python tool to make psychology experiments). Basically, as soon as the first script (gestures detection with the webcam) feeds the second one, I want to present another stimulus for the second one (psychopy task).

To summarise, I wish to open the video, then start the script (psychopy) and present the first simulus, then a movement is expected to be detected with the video. This information should be fed to the psychopy script to change stimulus.

So far I am really far of doing that and I have just been able to send movement ok to another script with a function such as the one following:

def f(child_conn,mvt_ok):
   
    print(mvt_ok)

Actually I am not sure how I could reuse the mvt_ok variable to feed it to the my psychopy script.

I won't put all the lines for the part handling the gesture recognition because it is maybe too long but the most crucial ones are here:

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    sentence = []

    while cap.isOpened(): 
        ret, frame = cap.read()
        image_np = np.array(frame)
        
        input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
        detections = detect_fn(input_tensor)
        
        num_detections = int(detections.pop('num_detections'))
        detections = {key: value[0, :num_detections].numpy()
                      for key, value in detections.items()}
        detections['num_detections'] = num_detections
    
        # detection_classes should be ints.
        detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
    
        label_id_offset = 1
        image_np_with_detections = image_np.copy()
    
        viz_utils.visualize_boxes_and_labels_on_image_array(
                    image_np_with_detections,
                    detections['detection_boxes'],
                    detections['detection_classes']+label_id_offset,
                    detections['detection_scores'],
                    category_index,
                    use_normalized_coordinates=True,
                    max_boxes_to_draw=5,
                    min_score_thresh=.8,
                    agnostic_mode=False)
        
        cv2.imshow('object detection',  cv2.resize(image_np_with_detections, (800, 600)))
        
        if np.max(detections['detection_scores'])>0.95:
            word = category_index[detections['detection_classes'][np.argmax(detections['detection_scores'])]+1]['name']
            sentence.append(word)
            
            if len(sentence)>=3:
                if sentence[-1]==sentence[-2] and sentence[-1]==sentence[-3]:
                    print('ok')
                    mvt_ok=1
                    p = Process(target=f, args=(child_conn,mvt_ok))
                    p.start()
                    p.join()
            
        
        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            break

Solution

  • One way to do this is with pure Python is to:

    There is a very good example here.


    Another, possibly easier, option is MQTT. You could install mosquitto as an MQTT broker. Then your webcam can "publish" detection events and your stimulus can "subscribe" to detections and get notified. And vice versa. MQTT allows for multi-megabyte messages so if big messages are needed, I would recommend it.

    The code for the video acquisition end might look like this:

    #!/usr/bin/env python3
    
    # Requires:
    # mosquitto broker to be running somewhere
    # pip install paho-mqtt
    
    from time import sleep
    import paho.mqtt.client as mqtt
    import json
    
    if __name__ == "__main__":
       # Get settings
       with open('settings.json') as f:
          settings = json.load(f)
          host = settings['MQTThost']
          port = settings['MQTTport']
          topic= settings['MQTTtopic']
    
       print(f'DEBUG: Connecting to MQTT on {host}:{port}')
       client = mqtt.Client()
       client.connect(host,port,60)
       print(f'DEBUG: Connected')
    
       # Grab some video frames and publish a dummy detection every 5 frames
       for i in range(100):
          print(f'DEBUG: Grabbing frame {i}')
        
          if i%5 == 0:
             # Publish dummy detection
             client.publish(topic, "Detection event")
             print(f'DEBUG: Detection event')
    
          sleep(1)
    

    And for the stimulus end, it might look like this:

    #!/usr/bin/env python3
    
    # Requires:
    # mosquitto broker to be running somewhere
    # pip install paho-mqtt
    
    from time import sleep
    import paho.mqtt.client as mqtt
    import json
    
    def on_connect(client, userdata, flags, rc):
       """
       This function is called on successful connection to the broker.
       """
       print(f'DEBUG: Connected with result code {rc}')
       client.subscribe(topic)
    
    def on_message(client, userdata, msg):
       """
       This function is called every time a message is published on
       the specified topic.
       """
       message = msg.payload.decode()
       print(f'Message received: {message}')
    
    if __name__ == "__main__":
       # Get settings
       with open('settings.json') as f:
          settings = json.load(f)
          host = settings['MQTThost']
          port = settings['MQTTport']
          topic= settings['MQTTtopic']
    
       print(f'DEBUG: Connecting to MQTT on {host}:{port}')
       client = mqtt.Client()
       client.on_connect = on_connect
       client.on_message = on_message
       client.connect(host,port,60)
    
       # Wait forever for messages
       client.loop_forever()
    

    And I used a settings.json containing this:

    {
      "MQTThost" : "localhost",
      "MQTTport" : 1883,
      "MQTTtopic": "/detections"
    }
    

    Redis also supports pub/sub and is simple, fast and lightweight. The could would be structured very similarly to that above for MQTT. You can also just share a variable, or a list, or an atomic integer, or a set with Redis.


    You could also use a simple UDP message between the processes if you don't want to pass big, complicated Python objects. It should be very reliable if both processes are on the same host and probably will allow up to 1kB or so of data per message. This is pure Python with no extra packages or modules or servers being needed.

    The video acquisition might look like this:

    #!/usr/bin/env python3
    
    from time import sleep
    import json
    import socket
    
    if __name__ == "__main__":
       # Get settings
       with open('settings2.json') as f:
          settings = json.load(f)
          host = settings['host']
          port = settings['port']
    
       print(f'DEBUG: Creating UDP socket, connecting to {host}:{port}')
       sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
       address = (host, port)
    
       # Grab some video frames and emit a dummy detection every 5 frames
       for i in range(100):
          print(f'DEBUG: Grabbing frame {i}')
        
          if i%5 == 0:
             # Emit dummy detection
             sock.sendto(b'Event detected', address)
             print(f'DEBUG: Detection event')
    
          sleep(1)
    

    And the stimulus code might look like this:

    #!/usr/bin/env python3
    
    import json
    import socket
    
    if __name__ == "__main__":
       # Get settings
       with open('settings2.json') as f:
          settings = json.load(f)
          host = settings['host']
          port = settings['port']
    
       print(f'DEBUG: Establishing listener on {host}:{port}')
       # Create a UDP socket
       sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
       # Bind the socket to the port
       sock.bind((host, port))
       print(f'DEBUG: Listening')
    
       while True:
          # Wait for message
          message, _ = sock.recvfrom(4096)
          print(f'DEBUG: Received {message}')
    

    And I used a settings2.json containing this:

    {
      "host" : "localhost",
      "port" : 50000
    }
    

    Another, pure Python way of doing this is with a multiprocessing connection. You would need to start the stimulus process first if using this method - or at least the process in which you put the listener. Note that you can send Python objects using this technique, just change to conn.send(SOMEDICT or ARRAY or LIST)

    The video acquisition might look like this:

    #!/usr/bin/env python3
    
    import time
    import json
    from multiprocessing.connection import Client
    
    if __name__ == "__main__":
       # Get settings
       with open('settings2.json') as f:
          settings = json.load(f)
          host = settings['host']
          port = settings['port']
          auth = settings['auth']
    
       print(f'DEBUG: Creating client, connecting to {host}:{port}')
       with Client((host, port), authkey=auth.encode()) as conn:
    
          # Grab some video frames and emit a dummy detection every 5 frames
          for i in range(100):
             print(f'DEBUG: Grabbing frame {i}')
        
             if i%5 == 0:
                # Emit dummy detection with current time
                conn.send('Detection event')
                print(f'DEBUG: Detection event')
    
             time.sleep(1)
    

    And the stimulus end might look like this:

    #!/usr/bin/env python3
    
    import json
    import time
    from multiprocessing.connection import Listener
    
    if __name__ == "__main__":
       # Get settings
       with open('settings2.json') as f:
          settings = json.load(f)
          host = settings['host']
          port = settings['port']
          auth = settings['auth']
    
       print(f'DEBUG: Establishing listener on {host}:{port}')
       with Listener((host,port), authkey=auth.encode()) as listener:
        with listener.accept() as conn:
            print(f'DEBUG: connection accepted from', listener.last_accepted)
            while True:
               # Wait for message
               message = conn.recv()
               print(f'DEBUG: Event received {message}')
    

    And your settings2.json would need to look like this:

    {
      "host" : "localhost",
      "port" : 50000,
      "auth" : "secret"
    }
    

    Some of the above ideas are very similar the examples I gave in this answer although the purpose is slightly different.


    None of these methods are locking/blocking, so you can happily run one program without the other needing to be running.