0

I am new to python and I am using it for the very first time to process pcap files. So far I have come with a program that filters out packets that belong to a specific IP and PROTOCOL and writes them to a new pcap file.

from scapy.all import *
import re
import glob

def process_pcap(path, hosts, ports):
    pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
    count=0;
    for pcap in glob.glob(os.path.join(path, '*.pcapng')):
        print "Reading file", pcap
        packets=rdpcap(pcap)
        for pkt in packets:
            if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
                if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                    count=count+1
                    print "Writing packets " , count
                    #wrpcap("temp.pcap", pkt)
                    pktdump.write(pkt)


path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocols to be added in filter
process_pcap(path, hosts, ports)  

This code was taking too long as the list of IP that it needs to match can be of 1000 IPs and pcap files in directory can also be of giga bytes. Thats why it is necessary to introduce multi threading. For that I have changed the code as below;

from scapy.all import *
import re
import glob
import threading


def process_packet(pkt, pktdump, packets, ports):
count = 0
if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
            if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                count=count+1
                print "Writing packets " , count
                #wrpcap("temp.pcap", pkt)
                pktdump.write(pkt)  


def process_pcap(path, hosts, ports):
pktdump = PcapWriter("temp11.pcap", append=True, sync=True)
ts=list()
for pcap in glob.glob(os.path.join(path, '*.pcapng')):
    print "Reading file", pcap
    packets=rdpcap(pcap)
    for pkt in packets:
         t=threading.Thread(target=process_packet,args=(pkt,pktdump, packets,ports,))
         ts.append(t)
         t.start()
for t in ts:
    t.join()    


path="\workspace\pcaps"
file_ip = open('ip_list.txt', 'r') #Text file with many ip address
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
ports=[443] # Protocos to be added in filter
process_pcap(path, hosts, ports)  

But I think I am not doing it in best way as time hasn't been reduced at all.

Any suggestions please!

EDIT:

I have changed code according to a response, my bad as it runs but threads are not terminating themselves. All examples on multi threading in python doesn't require thread to be terminated explicitly. Please pinpoint problem in this code;

from scapy.all import *
import re
import glob
import threading
import Queue
import multiprocessing

#global variables declaration

path="\pcaps"
pcapCounter = len(glob.glob1(path,"*.pcapng")) #size of the queue
q = Queue.Queue(pcapCounter) # queue to hold all pcaps in directory
pcap_lock = threading.Lock()
ports=[443] # Protocols to be added in filter


def safe_print(content):
    print "{0}\n".format(content),

def process_pcap (hosts):
    content = "Thread no ", threading.current_thread().name, " in action"
    safe_print(content)
    if not q.empty():
        with pcap_lock:
            content = "IN LOCK ", threading.current_thread().name
            safe_print(content)
            pcap=q.get()

        content = "OUT LOCK", threading.current_thread().name, " and reading packets from ", pcap
        safe_print(content)   
        packets=rdpcap(pcap)


        pktdump = PcapWriter(threading.current_thread().name+".pcapng", append=True, sync=True)
        pList=[]
        for pkt in packets:
            if (TCP in pkt and (pkt[TCP].sport in ports or pkt[TCP].dport in ports)):
                if (pkt[IP].src in hosts or pkt[IP].dst in hosts):
                    pList.append(pkt)

                    content="Wrting Packets to pcap ", threading.current_thread().name
                    safe_print(content)
                    pktdump.write(pList) 


else:
    content = "DONE!! QUEUE IS EMPTY", threading.current_thread().name
    safe_print(content)


for pcap in glob.glob(os.path.join(path, '*.pcapng')):
    q.put(pcap)

file_ip = open('ip_list.txt', 'r') #Text file with many ip addresses
o = file_ip.read()
hosts = re.findall( r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", o )
threads = []
cpu = multiprocessing.cpu_count() 
for i in range(cpu):
    t = threading.Thread(target=process_pcap, args=(hosts,), name = i)
    t.start()
    threads.append(t)

for t in threads:
    t.join()


print "Exiting Main Thread"

Here is response to above program; it never prints "Exiting Main Thread"

('Thread no ', 'Thread-1', ' in action')
('Thread no ', '3', ' in action')
('Thread no ', '1', ' in action')
('Thread no ', '2', ' in action')
('IN LOCK ', 'Thread-1')
('IN LOCK ', '3')
('OUT LOCK', 'Thread-1', ' and reading packets from ', 'path to\\test.pcapng')
('OUT LOCK', '3', ' and reading packets from ', 'path to\\test11.pcapng')
('IN LOCK ', '1')
('Wrting Packets to pcap ', '3')
('Wrting Packets to pcap ', 'Thread-1')

EDIT 2: I locked queue before length check and things worked out fine.

Thank you.

aneela
  • 1,457
  • 3
  • 24
  • 45

2 Answers2

3

You are creating a thread per packet. That's the fundamental problem.

Also, you are doing an I/O step on each processed packet instead of writing a batch of packets

You likely have between 1-10 cores on your PC. For count of packets you are processing, the overhead of creating 1000+ threads exceeds the value of the parallelism from each of your cores. There's a very fast law of diminishing returns to have more running threads than avaialble cores.

Here's a better approach where you will realize the benefits of parallelism.

The main thread creates a global queue and lock to be shared by the subsequent threads. Before creating any threads, the main thread enumerates the *.pcapng file list and puts each filename into the queue. It also reads the IP address list as well used for filtering out packets.

Then spawn N threads. Where N is the number of cores on your device (N = os.cpu_count()).

Each thread enters a lock to pop the next file off the queue established by the main thread then releases the lock. Then the thread reads the file into a packets list and removes the ones it doesn't need. Then save back to a separate unique file that represents the filtered results for the original input file. Ideally, the pktdump object supports writing more than 1 packet back at a time, as batching I/O operations saves a lot of time.

After the thread processes a single file, it re-enters the lock, pops off the next file from the queue, releases the lock, and repeats the processing for the next file.

The thread exits when the queue of file names becomes empty.

The main thread waits for all N threads to complete. Now you have a whole set of K files that you want to combine. Your main thread need only re-open these K files created by the threads and concatenate each back into a single output file.

selbie
  • 100,020
  • 15
  • 103
  • 173
  • Thanks @selbie,I have changed code according to your approach (hopefully), the problem is main thread is not being terminated and program seems to be like stuck or something. Please take a look – aneela Nov 08 '19 at 05:34
2

That is how python works with threads, read about GIL. If you want to do it in parallel, you should use multiprocessing

music_junkie
  • 189
  • 2
  • 16