0

First I'll explain how my code works. I have 3 modules that interact with each other: 2 modules connected to a single module via socket and send UDP frames. The single module receives the UDP frames, saves them to queues, then another function takes the queues as input and does some processing.

I am running the modules that send UDP frames in separate terminals. And I want to run the function that receives the UDP frames and the function that does the processing on the saved frames in different threads. For that I used threading and queue packages. However I didn't manage to run all the threads together; it always gets stuck in the second thread and never reaches the last thread.

Here is my code:
send_1.py:

import socket
import pickle
import time
def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    MESSAGE = {'x': 0.20, 'y': 0.2, 'z': 0.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:

        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()

send_2.py:

import socket
import pickle
import time


def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5006
    # MESSAGE = b"Hello, World!"
    MESSAGE = {'x': 2.20, 'y': 2.2, 'z': 2.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:
        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()

Here is the code that receives the frames, saves them to queues then processes them.
receive.py:

import threading
import queue
import socket
import pickle
import time


class ReceiveData1:
    def __init__(self):
        pass

    def receive_frame(self, q_1):
        UDP_IP = "127.0.0.1"
        UDP_PORT = 5005

        sock = socket.socket(socket.AF_INET,  # Internet
                             socket.SOCK_DGRAM)  # UDP
        sock.bind((UDP_IP, UDP_PORT))

        while True:
            data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
            data_1 = pickle.loads(data)
            print('data_1:', data_1)

            ts_1 = time.time()
            frame_1 = {'data': data_1, 'timestamp': ts_1}
            q_1.put(frame_1)


class ReceiveData2:
    def __init__(self):
        pass

    def receive_frame(self, q_2):
        UDP_IP = "127.0.0.1"
        UDP_PORT = 5006

        sock = socket.socket(socket.AF_INET,  # Internet
                             socket.SOCK_DGRAM)  # UDP
        sock.bind((UDP_IP, UDP_PORT))

        while True:
            data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
            data_2 = pickle.loads(data)
            print('data_2:', data_2)

            ts_2 = time.time()
            frame_2 = {'data': data_2, 'timestamp': ts_2}
            q_2.put(frame_2)


class MatchFrames:
    def __init__(self, delta_x, delta_y):
        self.delta_x = delta_x
        self.delta_y = delta_y

    def get_decision(self, queue_1, queue_2):
        print('queue_1:', queue_1)
        print('queue_2:', queue_2)
        frame_1 = queue_1.get()
        frame_2 = queue_2.get()
        data_1 = frame_1['data']
        data_1_ts = frame_1['timestamp']
        data_2 = frame_2['data']
        data_2_ts = frame_2['timestamp']
        decision = 'Unknown'
        while time.time() < data_1_ts + 3 and time.time() < data_2_ts + 3:
            if (data_2['x'] - self.delta_x <= data_1['x'] <= data_2['x'] + self.delta_x and
                    data_2['y'] - self.delta_y <= data_1['y'] <= data_2['y'] + self.delta_y):
                decision = 'Correct'
                break
            else:
                decision = 'Wrong'
                break
        print(decision)


if __name__ == '__main__':
    threads = []
    q_1 = queue.Queue()
    rec_1 = ReceiveData1()
    q_2 = queue.Queue()
    rec_2 = ReceiveData2()
    decide = MatchFrames(0.5, 0.5)
    t1 = threading.Thread(target=rec_1.receive_frame, args=(q_1,))
    t1.daemon = True
    threads.append(t1)
    t1.start()

    t2 = threading.Thread(target=rec_2.receive_frame, args=(q_2,))
    t2.daemon = True
    threads.append(t2)
    t2.start()

    t3 = threading.Thread(target=decide.get_decision, args=(q_1, q_2,))
    t3.daemon = True
    threads.append(t3)
    t3.start()

    for t in threads:
        t.join()
        q_1.join()
        q_2.join()

As I understood, it may be because of join(): by running thread.join() the next thread will wait till the previous thread is finished to be able to run, which never happens because it gets stuck in the while loops.

Any suggestions how to make the three threads run altogether and keep receiving the UDP frames?

singrium
  • 2,746
  • 5
  • 32
  • 45

1 Answers1

0

I managed to solve the issue by using asyncio

I made some changes on the code so that I only have methods now without classes.
So the code is as follows:

send_1.py

import socket
import pickle
import time
def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005
    MESSAGE = {'x': 0.20, 'y': 0.2, 'z': 0.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:

        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()  

send_2.py

import socket
import pickle
import time


def send_frame():
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5006
    # MESSAGE = b"Hello, World!"
    MESSAGE = {'x': 2.20, 'y': 2.2, 'z': 2.2}
    MESSAGE = pickle.dumps(MESSAGE)
    print(type(MESSAGE))

    print("UDP target IP:", UDP_IP)
    print("UDP target port:", UDP_PORT)
    print("message:", MESSAGE)

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    while True:
        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))
        time.sleep(5)


send_frame()

And now with the code that receives the frames, saves them to queues then processes them. receive.py

import asyncio
import queue
import socket
import pickle
import time


async def receive_frame1(q_1):
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5005

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
        data_1 = pickle.loads(data)
        print('data_1:', data_1)

        ts_1 = time.time()
        frame_1 = {'data': data_1, 'timestamp': ts_1}
        q_1.put(frame_1)
        await asyncio.sleep(0)


async def receive_frame2(q_2):
    UDP_IP = "127.0.0.1"
    UDP_PORT = 5006

    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    sock.bind((UDP_IP, UDP_PORT))

    while True:
        data, addr = sock.recvfrom(1024)  # buffer size is 1024 bytes
        data_2 = pickle.loads(data)
        print('data_2:', data_2)

        ts_2 = time.time()
        frame_2 = {'data': data_2, 'timestamp': ts_2}
        q_2.put(frame_2)
        await asyncio.sleep(0)


async def get_decision(queue_1, queue_2, delta_x, delta_y):
    while True:
        print('queue_1:', queue_1)
        print('queue_2:', queue_2)
        frame_1 = queue_1.get()
        frame_2 = queue_2.get()
        data_1 = frame_1['data']
        data_1_ts = frame_1['timestamp']
        data_2 = frame_2['data']
        data_2_ts = frame_2['timestamp']
        decision = 'Unknown'
        while time.time() < data_1_ts + 3 and time.time() < data_2_ts + 3:
            if (data_2['x'] - delta_x <= data_1['x'] <= data_2['x'] + delta_x and
                    data_2['y'] - delta_y <= data_1['y'] <= data_2['y'] + delta_y):
                decision = 'Correct'
                break
            else:
                decision = 'Wrong'
                break
        print('ts:', data_1_ts)
        print('ts2:', data_2_ts)
        print(decision)
        print('#' * 32)
        await asyncio.sleep(0)
        

if __name__ == '__main__':
    q_1 = queue.Queue()
    q_2 = queue.Queue()
    asyncio.ensure_future(receive_frame1(q_1))
    asyncio.ensure_future(receive_frame2(q_2))
    asyncio.ensure_future(get_decision(q_1, q_2, 3.5, 3.5))
    loop = asyncio.get_event_loop()
    loop.run_forever()

asyncio helped me running the threads asynchronously and keep them running continuously using to the loop.run_forever()

Some sources helped me understand how to use asyncio:
1- Getting Started With Async Features in Python
2- How to properly create and run concurrent tasks using python's asyncio module?

singrium
  • 2,746
  • 5
  • 32
  • 45