I have several .pcap files whose data I want write to one large dask data frame. Currently, initializes a dask data frame using data from the first file. It then is supposed to process the rest of the pcap files and add to that dask data frame using merge/concat. However, when I check the number of the rows of the merged dask dataframe it doesn't increase. What is happening?
I also am not sure if I am using the right approach for my use case. I am trying to convert my entire dataset into a giant dask dataframe and write it out to h5 file. My computer doesn't have enough memory to load the entire dataset so that's why I'm using dask. The idea is to load the dask dataframe that contains the entire dataset so I could do operations on the entire dataset. I'm new to dask and I've read over the some of the documentation but I'm still fuzzy about how dasks handles loading data from disk instead of memory. I'm also fuzzy about how partitions work in dask. Specifically, I'm also not sure how chunksize differs from partitions so I'm having trouble properly partitioning this dataframe. Any tips and advice would be helpful.
As said before, I've read over the main parts of the documentation.
I've tried using the dd.merge(dask_df, panda_df) as shown in the documentation. When I initialize the dask dataframe, it starts with 6 rows. When I use merge the row count decreases to 1
I've also tried using concat. Again, I have a count of 6 rows during initialization. However, after the concat operations the row count still remains at 6. I would expect the row count to increase.
Here is the initialization function
import os
import sys
import h5py
import pandas as pd
import dask.dataframe as dd
import gc
import pprint
from scapy.all import *
flags = {
'R': 0,
'A': 1,
'S': 2,
'DF':3,
'FA':4,
'SA':5,
'RA':6,
'PA':7,
'FPA':8
}
def initialize(file):
global flags
data = {
'time_delta': [0],
'ttl':[],
'len':[],
'dataofs':[],
'window':[],
'seq_delta':[0],
'ack_delta':[0],
'flags':[]
}
scap = sniff(offline=file,filter='tcp and ip')
for packet in range(0,len(scap)):
pkt = scap[packet]
flag = flags[str(pkt['TCP'].flags)]
data['ttl'].append(pkt['IP'].ttl)
data['len'].append(pkt['IP'].len)
data['dataofs'].append(pkt['TCP'].dataofs)
data['window'].append(pkt['TCP'].window)
data['flags'].append(flag)
if packet != 0:
lst_pkt = scap[packet-1]
data['time_delta'].append(pkt.time - lst_pkt.time)
data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)
panda = pd.DataFrame(data=data)
panda['ttl']=panda['ttl'].astype('float16')
panda['flags']=panda['flags'].astype('float16')
panda['dataofs']=panda['dataofs'].astype('float16')
panda['len']=panda['len'].astype('float16')
panda['window']=panda['window'].astype('float32')
panda['seq_delta']=panda['seq_delta'].astype('float32')
panda['ack_delta']=panda['ack_delta'].astype('float32')
df =dd.from_pandas(panda,npartitions=6)
gc.collect()
return df
Here is the concatenation function
def process(file):
global flags
global df
data = {
'time_delta': [0],
'ttl':[],
'len':[],
'dataofs':[],
'window':[],
'seq_delta':[0],
'ack_delta':[0],
'flags':[]
}
scap = sniff(offline=file,filter='tcp and ip')
for packet in range(0,len(scap)):
pkt = scap[packet]
flag = flags[str(pkt['TCP'].flags)]
data['ttl'].append(pkt['IP'].ttl)
data['len'].append(pkt['IP'].len)
data['dataofs'].append(pkt['TCP'].dataofs)
data['window'].append(pkt['TCP'].window)
data['flags'].append(flag)
if packet != 0:
lst_pkt = scap[packet-1]
data['time_delta'].append(pkt.time - lst_pkt.time)
data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)
panda = pd.DataFrame(data=data)
panda['ttl']=panda['ttl'].astype('float16')
panda['flags']=panda['flags'].astype('float16')
panda['dataofs']=panda['dataofs'].astype('float16')
panda['len']=panda['len'].astype('float16')
panda['window']=panda['window'].astype('float32')
panda['seq_delta']=panda['seq_delta'].astype('float32')
panda['ack_delta']=panda['ack_delta'].astype('float32')
#merge version dd.merge(df, panda)
dd.concat([df,dd.from_pandas(panda,npartitions=6)])
gc.collect()
And here is the main program
directory = 'dev/streams/'
files = os.listdir(directory)
df = initialize(directory+files[0])
files.remove(files[0])
for file in files:
process(directory+file)
print(len(df))
using merge:
print(len(df)) = 1
using concat:
print(len(df))=6
expected:
print(len(df)) > 10,000