I am trying to have 2 scripts running in parallel and feeding one with the other.
First I trained a model to decode different gestures. I followed the tutorial right here: https://www.youtube.com/watch?v=yqkISICHH-U
That script opens the webcam and decodes the gestures I am doing, and create a new variable when the same movement is decoded 3 consecutive times (called mvt_ok
). At that time I wish to send the information to another script that will be an experimental task develloped on psychopy (a python tool to make psychology experiments). Basically, as soon as the first script (gestures detection with the webcam) feeds the second one, I want to present another stimulus for the second one (psychopy task).
To summarise, I wish to open the video, then start the script (psychopy) and present the first simulus, then a movement is expected to be detected with the video. This information should be fed to the psychopy script to change stimulus.
So far I am really far of doing that and I have just been able to send movement ok to another script with a function such as the one following:
def f(child_conn,mvt_ok):
print(mvt_ok)
Actually I am not sure how I could reuse the mvt_ok
variable to feed it to the my psychopy script.
I won't put all the lines for the part handling the gesture recognition because it is maybe too long but the most crucial ones are here:
if __name__ == '__main__':
parent_conn,child_conn = Pipe()
sentence = []
while cap.isOpened():
ret, frame = cap.read()
image_np = np.array(frame)
input_tensor = tf.convert_to_tensor(np.expand_dims(image_np, 0), dtype=tf.float32)
detections = detect_fn(input_tensor)
num_detections = int(detections.pop('num_detections'))
detections = {key: value[0, :num_detections].numpy()
for key, value in detections.items()}
detections['num_detections'] = num_detections
# detection_classes should be ints.
detections['detection_classes'] = detections['detection_classes'].astype(np.int64)
label_id_offset = 1
image_np_with_detections = image_np.copy()
viz_utils.visualize_boxes_and_labels_on_image_array(
image_np_with_detections,
detections['detection_boxes'],
detections['detection_classes']+label_id_offset,
detections['detection_scores'],
category_index,
use_normalized_coordinates=True,
max_boxes_to_draw=5,
min_score_thresh=.8,
agnostic_mode=False)
cv2.imshow('object detection', cv2.resize(image_np_with_detections, (800, 600)))
if np.max(detections['detection_scores'])>0.95:
word = category_index[detections['detection_classes'][np.argmax(detections['detection_scores'])]+1]['name']
sentence.append(word)
if len(sentence)>=3:
if sentence[-1]==sentence[-2] and sentence[-1]==sentence[-3]:
print('ok')
mvt_ok=1
p = Process(target=f, args=(child_conn,mvt_ok))
p.start()
p.join()
if cv2.waitKey(10) & 0xFF == ord('q'):
cap.release()
cv2.destroyAllWindows()
break
One way to do this is with pure Python is to:
Queue()
that Python objects can be sent through much more simply than via socketsThere is a very good example here.
Another, possibly easier, option is MQTT. You could install mosquitto
as an MQTT broker. Then your webcam can "publish" detection events and your stimulus can "subscribe" to detections and get notified. And vice versa. MQTT allows for multi-megabyte messages so if big messages are needed, I would recommend it.
The code for the video acquisition end might look like this:
#!/usr/bin/env python3
# Requires:
# mosquitto broker to be running somewhere
# pip install paho-mqtt
from time import sleep
import paho.mqtt.client as mqtt
import json
if __name__ == "__main__":
# Get settings
with open('settings.json') as f:
settings = json.load(f)
host = settings['MQTThost']
port = settings['MQTTport']
topic= settings['MQTTtopic']
print(f'DEBUG: Connecting to MQTT on {host}:{port}')
client = mqtt.Client()
client.connect(host,port,60)
print(f'DEBUG: Connected')
# Grab some video frames and publish a dummy detection every 5 frames
for i in range(100):
print(f'DEBUG: Grabbing frame {i}')
if i%5 == 0:
# Publish dummy detection
client.publish(topic, "Detection event")
print(f'DEBUG: Detection event')
sleep(1)
And for the stimulus end, it might look like this:
#!/usr/bin/env python3
# Requires:
# mosquitto broker to be running somewhere
# pip install paho-mqtt
from time import sleep
import paho.mqtt.client as mqtt
import json
def on_connect(client, userdata, flags, rc):
"""
This function is called on successful connection to the broker.
"""
print(f'DEBUG: Connected with result code {rc}')
client.subscribe(topic)
def on_message(client, userdata, msg):
"""
This function is called every time a message is published on
the specified topic.
"""
message = msg.payload.decode()
print(f'Message received: {message}')
if __name__ == "__main__":
# Get settings
with open('settings.json') as f:
settings = json.load(f)
host = settings['MQTThost']
port = settings['MQTTport']
topic= settings['MQTTtopic']
print(f'DEBUG: Connecting to MQTT on {host}:{port}')
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(host,port,60)
# Wait forever for messages
client.loop_forever()
And I used a settings.json
containing this:
{
"MQTThost" : "localhost",
"MQTTport" : 1883,
"MQTTtopic": "/detections"
}
Redis also supports pub/sub and is simple, fast and lightweight. The could would be structured very similarly to that above for MQTT. You can also just share a variable, or a list, or an atomic integer, or a set with Redis.
You could also use a simple UDP message between the processes if you don't want to pass big, complicated Python objects. It should be very reliable if both processes are on the same host and probably will allow up to 1kB or so of data per message. This is pure Python with no extra packages or modules or servers being needed.
The video acquisition might look like this:
#!/usr/bin/env python3
from time import sleep
import json
import socket
if __name__ == "__main__":
# Get settings
with open('settings2.json') as f:
settings = json.load(f)
host = settings['host']
port = settings['port']
print(f'DEBUG: Creating UDP socket, connecting to {host}:{port}')
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
address = (host, port)
# Grab some video frames and emit a dummy detection every 5 frames
for i in range(100):
print(f'DEBUG: Grabbing frame {i}')
if i%5 == 0:
# Emit dummy detection
sock.sendto(b'Event detected', address)
print(f'DEBUG: Detection event')
sleep(1)
And the stimulus code might look like this:
#!/usr/bin/env python3
import json
import socket
if __name__ == "__main__":
# Get settings
with open('settings2.json') as f:
settings = json.load(f)
host = settings['host']
port = settings['port']
print(f'DEBUG: Establishing listener on {host}:{port}')
# Create a UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind the socket to the port
sock.bind((host, port))
print(f'DEBUG: Listening')
while True:
# Wait for message
message, _ = sock.recvfrom(4096)
print(f'DEBUG: Received {message}')
And I used a settings2.json
containing this:
{
"host" : "localhost",
"port" : 50000
}
Another, pure Python way of doing this is with a multiprocessing connection. You would need to start the stimulus
process first if using this method - or at least the process in which you put the listener. Note that you can send Python objects using this technique, just change to conn.send(SOMEDICT or ARRAY or LIST)
The video acquisition might look like this:
#!/usr/bin/env python3
import time
import json
from multiprocessing.connection import Client
if __name__ == "__main__":
# Get settings
with open('settings2.json') as f:
settings = json.load(f)
host = settings['host']
port = settings['port']
auth = settings['auth']
print(f'DEBUG: Creating client, connecting to {host}:{port}')
with Client((host, port), authkey=auth.encode()) as conn:
# Grab some video frames and emit a dummy detection every 5 frames
for i in range(100):
print(f'DEBUG: Grabbing frame {i}')
if i%5 == 0:
# Emit dummy detection with current time
conn.send('Detection event')
print(f'DEBUG: Detection event')
time.sleep(1)
And the stimulus end might look like this:
#!/usr/bin/env python3
import json
import time
from multiprocessing.connection import Listener
if __name__ == "__main__":
# Get settings
with open('settings2.json') as f:
settings = json.load(f)
host = settings['host']
port = settings['port']
auth = settings['auth']
print(f'DEBUG: Establishing listener on {host}:{port}')
with Listener((host,port), authkey=auth.encode()) as listener:
with listener.accept() as conn:
print(f'DEBUG: connection accepted from', listener.last_accepted)
while True:
# Wait for message
message = conn.recv()
print(f'DEBUG: Event received {message}')
And your settings2.json
would need to look like this:
{
"host" : "localhost",
"port" : 50000,
"auth" : "secret"
}
Some of the above ideas are very similar the examples I gave in this answer although the purpose is slightly different.
None of these methods are locking/blocking, so you can happily run one program without the other needing to be running.