The main issue keeping you from achieving your desired performance is the send_pkts()
method. It doesn't just send the packet, it also crafts the packet:
def send_pkts(ip):
#craft packet
while True:
#send packet
time.sleep(randint(0,3))
While sending a packet is almost certainly an I/O bound task, crafting a packet is almost certainly a CPU bound task. This method needs to be split into two tasks:
- craft a packet
- send a packet
I've written a basic socket server and a client app that crafts and sends packets to the server. The idea is to have a separate process which crafts the packets and puts them into a queue. There is a pool of threads that share the queue with the packet crafting process. These threads pull packets off of the queue and send them to the server. They also stick the server's responses into another shared queue but that's just for my own testing and not relevant to what you're trying to do. The threads exit when they get a None
(poison pill) from the queue.
server.py:
import argparse
import socketserver
import time
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, help="bind to host")
parser.add_argument("--port", type=int, help="bind to port")
parser.add_argument("--packet-size", type=int, help="size of packets")
args = parser.parse_args()
HOST, PORT = args.host, args.port
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
time.sleep(1.5)
data = self.request.recv(args.packet_size)
self.request.sendall(data.upper())
with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
client.py:
import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread
def get_logger():
logger = logging.getLogger("threading_example")
logger.setLevel(logging.INFO)
fh = logging.FileHandler("client.log")
fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
formatter = logging.Formatter(fmt)
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
class PacketMaker(mp.Process):
def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
mp.Process.__init__(self)
self.result_queue = result_queue
self.max_packets = max_packets
self.packet_size = packet_size
self.num_poison_pills = num_poison_pills
self.num_packets_made = 0
self.logger = logger
def run(self):
while True:
if self.num_packets_made >= self.max_packets:
for _ in range(self.num_poison_pills):
self.result_queue.put(None, timeout=1)
self.logger.debug('PacketMaker exiting')
return
self.result_queue.put(os.urandom(self.packet_size), timeout=1)
self.num_packets_made += 1
class PacketSender(Thread):
def __init__(self, task_queue, result_queue, addr, packet_size, logger):
Thread.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.server_addr = addr
self.packet_size = packet_size
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(addr)
self.logger = logger
def run(self):
while True:
packet = self.task_queue.get(timeout=1)
if packet is None:
self.logger.debug("PacketSender exiting")
return
try:
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
except socket.error:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.server_addr)
self.sock.sendall(packet)
response = self.sock.recv(self.packet_size)
self.result_queue.put(response)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num-packets', type=int, help='number of packets to send')
parser.add_argument('--packet-size', type=int, help='packet size in bytes')
parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
parser.add_argument('--host', type=str, help='name of host packets will be sent to')
parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
args = parser.parse_args()
logger = get_logger()
logger.info(f"starting script with args {args}")
packets_to_send = mp.Queue(args.num_packets + args.num_threads)
packets_received = q.Queue(args.num_packets)
producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
for _ in range(args.num_threads)]
start_time = time.time()
logger.info("starting workers")
for worker in senders + producers:
worker.start()
for worker in senders:
worker.join()
logger.info("workers finished")
end_time = time.time()
print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")
run.sh:
#!/usr/bin/env bash
for i in "$@"
do
case $i in
-s=*|--packet-size=*)
packet_size="${i#*=}"
shift
;;
-n=*|--num-packets=*)
num_packets="${i#*=}"
shift
;;
-t=*|--num-threads=*)
num_threads="${i#*=}"
shift
;;
-h=*|--host=*)
host="${i#*=}"
shift
;;
-p=*|--port=*)
port="${i#*=}"
shift
;;
*)
;;
esac
done
python3 server.py --host="${host}" \
--port="${port}" \
--packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" \
--num-packets="${num_packets}" \
--num-threads="${num_threads}" \
--host="${host}" \
--port="${port}"
kill "${server_pid}"
$ ./run.sh -s=1024 -n=1500 -t=300 -h=localhost -p=9999
1500 packets received in 4.70330023765564 seconds
$ ./run.sh -s=1024 -n=1500 -t=1500 -h=localhost -p=9999
1500 packets received in 1.5025699138641357 seconds
This result may be verified by changing the log level in client.py to DEBUG
. Note that the script does take much longer than 4.7 seconds to complete. There is quite a lot of teardown required when using 300 threads, but the log makes it clear that the threads are done processing at 4.7 seconds.
Take all performance results with a grain of salt. I have no clue what system you're running this on. I will provide my relevant system stats:
2 Xeon X5550 @2.67GHz
24MB DDR3 @1333MHz
Debian 10
Python 3.7.3
I'll address the issues with your attempts:
- Simple single-threaded: This is all but guaranteed to take at least 1.5 x num_packets seconds due to the
randint(0, 3)
delay
- Multithreading: The GIL is the likely bottleneck here, but it's likely because of the
craft packet
part rather than send packet
- Multiprocessing: Each process requires at least one file descriptor so you're probably exceeding the user or system limit, but this could work if you change the appropriate settings
- Multiprocessing+multithreading: This fails for the same reason as #2, crafting the packet is probably CPU bound
The rule of thumb is: I/O bound - use threads, CPU bound - use processes