First I'll explain how my code works. I have 3 modules that interact with each other: 2 modules connected to a single module via socket and send UDP frames. The single module receives the UDP frames, saves them to queues, then another function takes the queues as input and does some processing.
I am running the modules that send UDP frames in separate terminals. And I want to run the function that receives the UDP frames and the function that does the processing on the saved frames in different threads. For that I used threading and queue packages. However I didn't manage to run all the threads together; it always gets stuck in the second thread and never reaches the last thread.
Here is my code:
send_1.py:
import socket
import pickle
import time
def send_frame():
UDP_IP = "127.0.0.1"
UDP_PORT = 5005
MESSAGE = {'x': 0.20, 'y': 0.2, 'z': 0.2}
MESSAGE = pickle.dumps(MESSAGE)
print(type(MESSAGE))
print("UDP target IP:", UDP_IP)
print("UDP target port:", UDP_PORT)
print("message:", MESSAGE)
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
while True:
sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
time.sleep(5)
send_frame()
send_2.py:
import socket
import pickle
import time
def send_frame():
UDP_IP = "127.0.0.1"
UDP_PORT = 5006
# MESSAGE = b"Hello, World!"
MESSAGE = {'x': 2.20, 'y': 2.2, 'z': 2.2}
MESSAGE = pickle.dumps(MESSAGE)
print(type(MESSAGE))
print("UDP target IP:", UDP_IP)
print("UDP target port:", UDP_PORT)
print("message:", MESSAGE)
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
while True:
sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
time.sleep(5)
send_frame()
Here is the code that receives the frames, saves them to queues then processes them.
receive.py:
import threading
import queue
import socket
import pickle
import time
class ReceiveData1:
def __init__(self):
pass
def receive_frame(self, q_1):
UDP_IP = "127.0.0.1"
UDP_PORT = 5005
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
data_1 = pickle.loads(data)
print('data_1:', data_1)
ts_1 = time.time()
frame_1 = {'data': data_1, 'timestamp': ts_1}
q_1.put(frame_1)
class ReceiveData2:
def __init__(self):
pass
def receive_frame(self, q_2):
UDP_IP = "127.0.0.1"
UDP_PORT = 5006
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((UDP_IP, UDP_PORT))
while True:
data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
data_2 = pickle.loads(data)
print('data_2:', data_2)
ts_2 = time.time()
frame_2 = {'data': data_2, 'timestamp': ts_2}
q_2.put(frame_2)
class MatchFrames:
def __init__(self, delta_x, delta_y):
self.delta_x = delta_x
self.delta_y = delta_y
def get_decision(self, queue_1, queue_2):
print('queue_1:', queue_1)
print('queue_2:', queue_2)
frame_1 = queue_1.get()
frame_2 = queue_2.get()
data_1 = frame_1['data']
data_1_ts = frame_1['timestamp']
data_2 = frame_2['data']
data_2_ts = frame_2['timestamp']
decision = 'Unknown'
while time.time() < data_1_ts + 3 and time.time() < data_2_ts + 3:
if (data_2['x'] - self.delta_x <= data_1['x'] <= data_2['x'] + self.delta_x and
data_2['y'] - self.delta_y <= data_1['y'] <= data_2['y'] + self.delta_y):
decision = 'Correct'
break
else:
decision = 'Wrong'
break
print(decision)
if __name__ == '__main__':
threads = []
q_1 = queue.Queue()
rec_1 = ReceiveData1()
q_2 = queue.Queue()
rec_2 = ReceiveData2()
decide = MatchFrames(0.5, 0.5)
t1 = threading.Thread(target=rec_1.receive_frame, args=(q_1,))
t1.daemon = True
threads.append(t1)
t1.start()
t2 = threading.Thread(target=rec_2.receive_frame, args=(q_2,))
t2.daemon = True
threads.append(t2)
t2.start()
t3 = threading.Thread(target=decide.get_decision, args=(q_1, q_2,))
t3.daemon = True
threads.append(t3)
t3.start()
for t in threads:
t.join()
q_1.join()
q_2.join()
As I understood, it may be because of join()
: by running thread.join()
the next thread will wait till the previous thread is finished to be able to run, which never happens because it gets stuck in the while loops.
Any suggestions how to make the three threads run altogether and keep receiving the UDP frames?