0

I have several .pcap files whose data I want write to one large dask data frame. Currently, initializes a dask data frame using data from the first file. It then is supposed to process the rest of the pcap files and add to that dask data frame using merge/concat. However, when I check the number of the rows of the merged dask dataframe it doesn't increase. What is happening?

I also am not sure if I am using the right approach for my use case. I am trying to convert my entire dataset into a giant dask dataframe and write it out to h5 file. My computer doesn't have enough memory to load the entire dataset so that's why I'm using dask. The idea is to load the dask dataframe that contains the entire dataset so I could do operations on the entire dataset. I'm new to dask and I've read over the some of the documentation but I'm still fuzzy about how dasks handles loading data from disk instead of memory. I'm also fuzzy about how partitions work in dask. Specifically, I'm also not sure how chunksize differs from partitions so I'm having trouble properly partitioning this dataframe. Any tips and advice would be helpful.

As said before, I've read over the main parts of the documentation.

I've tried using the dd.merge(dask_df, panda_df) as shown in the documentation. When I initialize the dask dataframe, it starts with 6 rows. When I use merge the row count decreases to 1

I've also tried using concat. Again, I have a count of 6 rows during initialization. However, after the concat operations the row count still remains at 6. I would expect the row count to increase.

Here is the initialization function

import os
import sys
import h5py
import pandas as pd
import dask.dataframe as dd
import gc
import pprint
from scapy.all import *
flags = {
        'R': 0,
        'A': 1,
        'S': 2,
        'DF':3,
        'FA':4,
        'SA':5,
        'RA':6,
        'PA':7,
        'FPA':8
    }

def initialize(file):
    global flags
    data = {
        'time_delta': [0],
        'ttl':[],
        'len':[],
        'dataofs':[],
        'window':[],
        'seq_delta':[0],
        'ack_delta':[0],
        'flags':[]
    }
    scap = sniff(offline=file,filter='tcp and ip')
    for packet in range(0,len(scap)):
        pkt = scap[packet]
        flag = flags[str(pkt['TCP'].flags)]
        data['ttl'].append(pkt['IP'].ttl)
        data['len'].append(pkt['IP'].len)
        data['dataofs'].append(pkt['TCP'].dataofs)
        data['window'].append(pkt['TCP'].window)
        data['flags'].append(flag)
        if packet != 0:
            lst_pkt = scap[packet-1]
            data['time_delta'].append(pkt.time - lst_pkt.time)
            data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
            data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)

    panda = pd.DataFrame(data=data)
    panda['ttl']=panda['ttl'].astype('float16')
    panda['flags']=panda['flags'].astype('float16')
    panda['dataofs']=panda['dataofs'].astype('float16')
    panda['len']=panda['len'].astype('float16')
    panda['window']=panda['window'].astype('float32')
    panda['seq_delta']=panda['seq_delta'].astype('float32')
    panda['ack_delta']=panda['ack_delta'].astype('float32')

    df =dd.from_pandas(panda,npartitions=6)

    gc.collect()
    return df

Here is the concatenation function

def process(file):
    global flags
    global df
    data = {
        'time_delta': [0],
        'ttl':[],
        'len':[],
        'dataofs':[],
        'window':[],
        'seq_delta':[0],
        'ack_delta':[0],
        'flags':[]
    }
    scap = sniff(offline=file,filter='tcp and ip')
    for packet in range(0,len(scap)):
        pkt = scap[packet]
        flag = flags[str(pkt['TCP'].flags)]
        data['ttl'].append(pkt['IP'].ttl)
        data['len'].append(pkt['IP'].len)
        data['dataofs'].append(pkt['TCP'].dataofs)
        data['window'].append(pkt['TCP'].window)
        data['flags'].append(flag)
        if packet != 0:
            lst_pkt = scap[packet-1]
            data['time_delta'].append(pkt.time - lst_pkt.time)
            data['seq_delta'].append(pkt['TCP'].seq - lst_pkt['TCP'].seq)
            data['ack_delta'].append(pkt['TCP'].ack - lst_pkt['TCP'].ack)

    panda = pd.DataFrame(data=data)
    panda['ttl']=panda['ttl'].astype('float16')
    panda['flags']=panda['flags'].astype('float16')
    panda['dataofs']=panda['dataofs'].astype('float16')
    panda['len']=panda['len'].astype('float16')
    panda['window']=panda['window'].astype('float32')
    panda['seq_delta']=panda['seq_delta'].astype('float32')
    panda['ack_delta']=panda['ack_delta'].astype('float32')

    #merge version dd.merge(df, panda)
    dd.concat([df,dd.from_pandas(panda,npartitions=6)])

    gc.collect()

And here is the main program

directory = 'dev/streams/'
files = os.listdir(directory)
df = initialize(directory+files[0])
files.remove(files[0])
for file in files:
    process(directory+file)
print(len(df))

using merge:

print(len(df)) = 1

using concat:

print(len(df))=6

expected:

print(len(df)) > 10,000
darthbith
  • 18,484
  • 9
  • 60
  • 76
mnsupreme
  • 125
  • 1
  • 6

1 Answers1

1

Try explicitly returning df as the result of your dask concat:

df = dd.concat([df, dd.from_pandas(panda,npartitions=6)])

And don't duplicate the exact same blocks of code but encaspulate them in another function:

def process_panda(file_wpath, flags):
    data = {
    [...]
    panda['ack_delta']=panda['ack_delta'].astype('float32')
    return panda

Then you just have to test if the file to process is the first, so your main code becomes:

import os
import sys
import h5py
import pandas as pd
import dask.dataframe as dd
import gc
import pprint
from scapy.all import *

flags = {
        'R': 0,
        'A': 1,
        'S': 2,
        'DF':3,
        'FA':4,
        'SA':5,
        'RA':6,
        'PA':7,
        'FPA':8
    }

directory = 'dev/streams/'
files = os.listdir(directory)

for file in files:
    file_wpath = os.path.join(directory, file)
    panda = process_panda(file_wpath, flags)
    if file == files[0]:
        df = dd.from_pandas(panda, npartitions=6)
    else:
        df = dd.concat([df, dd.from_pandas(panda, npartitions=6)])        
    gc.collect()

print(len(df))
Basile
  • 121
  • 6
  • Thanks for helping to catch the bug in my code. Your suggestions definitely helped me clean my code. Still would like some dask specific advice on the best approach to my problem. – mnsupreme Jun 02 '19 at 22:23
  • For starters, have a look at that answer to [Can dask parralelize reading fom a csv file?](https://stackoverflow.com/questions/40100176/can-dask-parralelize-reading-fom-a-csv-file?answertab=votes#tab-top) and scroll all the way to [mgoldwasser](https://stackoverflow.com/users/2636317/mgoldwasser)'s [answer](https://stackoverflow.com/a/56294650/11555231) for the most recent ways to optimize the use of dask with csv files – Basile Jun 03 '19 at 17:13
  • Well my concern isnt so much with reading csv files as that's not what I'm reading. More like is this the best way to store my data in dask? And how to best repartition my files – mnsupreme Jun 04 '19 at 21:29
  • OK, then have a look at df.repartition(npartitions= 1+df.memory_usage(deep=True).sum().compute() // 100 ) suggested by [Strategy for partitioning dask dataframes efficiently](https://stackoverflow.com/questions/44657631/strategy-for-partitioning-dask-dataframes-efficiently/52815503#52815503) and [Repartition to Reduce Overhead - dask doc](https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead) – Basile Jun 05 '19 at 17:33