1

I have a UDP server listening to incoming traffic in a thread. The messages come from an external device in JSON format, for example, {"_id": "0x00", "status": "on"}. This information needs to be parsed by ther UDP handler and stored in a dict of objects (or update if the _id exists) At the moment I can receive and parse JSON, but not entierly sure how to store this data or handle it correctly. I would like to use a Queue in the UDP Handler and a separate Msg Handler that processes the queue but not sure if it's the right way of doing it.

Please note: I have re-written the code omitting some syntax etc.

# I create a node server thread in the main of my program with IP/PORT

class NodeServer(threading.Thread):
    def __init__():
        self.server = UDPServer((address, port), UDPHandler)

    def run():
        self.server.serve_forever()

class UDPServer(socketserver.ThreadingUDPServer):
    allow_reuse_address = True

class UDPHandler(socketserver.BaseRequestHandler):
    data_q = queue.Queue()

    handler = NodeHandler(data_q)
    handler.start()

    def handle():
        try:
            # receive the message from client
            data = self.request[0].decode("UTF-8")

            # check if it's in json format

            # HANDLE THE MESSAGE
            self.data_q.put(data)

            # stop the thread
            self.handler.join()
            # send an "ACK" msg back to the client
      except: Exception as e:
            #handle

class NodeHandler(threading.Thread):
    table = NodeTable()

    def __init(data_q):
        self.data_q = data_q

    def handle():
      # get the string message from the queue (filled by the UDPHandler)
      msg = self.data_q.get()

      # check if the "_id" field exists in the current node table

      # if it exists in the table, find it, update the fields from json

      # otherwise, if it's a new "_id": create a new node
      json_msg = json.parse(msg)
      node = Node(json_msg["_id"])
      # set other node parameters from the json object, status etc

      # update the node table with the new information from the message
      table.put(nd)
      #
    def join():
        # join thread

class NodeTable():
  # the table is a dictionary with ID and a Node object, ie {0x01: Node}
  table = {}
  def put(_id, node):
      self.table[_id] = node

  def get(_id):
      return self.table[_id]

class Node():
    id = 0
    def __init__(id):
        self.id = id
        # new node
    # other node functions
  • Should the NodeHandler be created in the UDP Handler as a separate thread or just an handler object?
  • Does the Node Table need to be global, if not, how to access it outside of the server? Perhaps just passing it as an object.
  • Is NodeTable a good data structure for holding unique node objects, or there is a better way?

Thank you!

alexm
  • 460
  • 5
  • 14

1 Answers1

0

1) You can implement a UDP server which processes incoming messages in an infinite loop much more simply with these lines of code:

 import socket

 def udp_server(udp_ip, udp_port, ...):
   sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
   sock.bind((upd_ip, upd_port))
   while True:
     data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
     ...process data...

See https://wiki.python.org/moin/UdpCommunication for more details and discussion.

The arguments to udp_server are the udp IP and port, and whatever other data structures the server needs to interact with.

And kicking this off into its own thread is very easily acccomplished with:

 import threading

 t = threading.Thread(target = udp_server, args = (...))
 t.start()

2) The NodeTable class is just a wrapper around a python dictionary, but it seems that you want to have multiple threads access it at the same time. In that case you should read this SO answer: (link).

Depending on what other threads besides the server thread can do to the node dictionary you may or may not need a lock.

To summarize, where is how I would write the code:

 def main():
   nodes = {}     # use a simple dict for storing the nodes
   lock = RLock() # if you need this
   # pass nodes and lock to server thread and start it
   t = threading.Thread(target = udp_server, args = (udp_ip, udp_port, nodes, lock))
   t.start() 
   ...

At this point the udp server is running and the main thread can access the node table via the variable nodes.

Does the main thread need to be informed when new nodes have been added to the node table? Then perhaps a Queue is what you want. You would 1) create it in main() and 2) pass it to udp_server:

 def main()
   nodes = {}     # use a simple dict for storing the nodes
   lock = RLock() # if you need this
   q = Queue()    # create a Queue and pass it to the udp server
   # pass nodes and lock to server thread and start it
   t = threading.Thread(target = udp_server, args = (udp_ip, udp_port, nodes, lock, q))
   t.start() 
   # process entries from the Queue
   while True:
     item = q.get()
     ... process item...

and in the udp server function ...process data... will put something onto the queue:

   while True:
     data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
     ...json decode, etc. ...
     q.put(...) 
Community
  • 1
  • 1
ErikR
  • 51,541
  • 9
  • 73
  • 124
  • This is very helpful, thank you. I have a server thread which runs the udp_server like you suggested and puts the items back in the queue which I can access from the serer thread. I haven't used mutex locks in python before, but should have no trouble following your example! – alexm Jan 22 '15 at 22:56