0

I've a json file that I want to remove duplicate rows from, but it's too large to fit into memory. I found a way to get it done, but my guess is that it's not the best way.

My problem is that it runs in 8 minutes for a 12gb dataset. But the requirement is to scale the code so that it could run on 100gb dataset. Any pointers on how to do this? Should I use multi-threading or multi-processing in python to achieve this ? Or any other method?

This is the code:

import json
import time

""" This class contains the business logic for identifying the duplicates and creating an output file for further processing """

class BusinessService:

    """ The method identiifes the duplicate """
    def service(ipPath,opPath):
            start_time = time.time()    #We start the timer to see how much time the method takes to work #
            uniqueHandleSet = set();     #Creating a set to store unique values #
            try:
                duplicateHandles = open(opPath,'w+',encoding='utf-8')     #Opening and creating an output file to catch the duplicate hanndles #                     
                with open(ipPath,buffering = 200000000,encoding = 'utf-8') as infile:     #Reading the JSON File by buffering and using 20mb as it is too big to read at once #       
                    for line in infile:
                        tweetJsonObject = json.loads(line);

                        if tweetJsonObject["name"] not in uniqueHandleSet:
                            uniqueHandleSet.add(tweetJsonObject["name"]);
                        else:
                            duplicateHandles.write(line);

                print("--- %s seconds --- memory 200mb while buffering" % (time.time() - start_time));  #Printing the total time required to execute 
            except:
                print("Error")
            finally:
                duplicateHandles.close();
Mark Tolonen
  • 166,664
  • 26
  • 169
  • 251
Mohit Ruke
  • 11
  • 2
  • Splitting the file in parts and using multiple processes is tricky. You can only check for duplicates within each chunk , not between chunks as they are in different processes. IMHO you need some kind of index. E. g. calculate the hash of each row and but it in a dict - hash as key and row number as value. This way you get only unique hashes. – RaJa Dec 18 '18 at 08:26
  • Thnan you, but I am already doing that ans storing the uniqne values in a set. my ouptut needs duplicates. Now the requirement is to increase the scaling of the code. – Mohit Ruke Dec 18 '18 at 21:15

1 Answers1

0

To scale it, you would need queues for feeding multiples processses and two shared lists to keep track of your results. The main idea is feeding the file line by line to a queue that is subsequently processed by some consumer processes. These processes however share two lists to store the intermediate results. The Manager is responsible for the synchronization between the processes.

The following code is just some rough guideline, not really tested:

from multiprocessing import Process, Manager, Queue

def findDuplicate(inputQueue, uniqueValues, duplicates):
    for line in iter(inputQueue.get, 'STOP'): #get line from Queue, stop if 'STOP' is received
        if line not in uniqueValues: # check if duplicate
            uniqueValues.append(line)
        else:
            duplicates.append(line) # store it

manager = Manager() # get a new SyncManager
uniqueValues = manager.list() # handle for shared list
duplicates = manager.list() # a 2nd handle for a shared list
inputQueue = Queue() # a queue to provide tasks to the processes

# setup workers, provide shared lists and tasks
numProc = 4
process = [Process(target=findDuplicate,
                      args=(inputQueue, uniqueValues, duplicates)) for x in range(numProc)]

# start processes, they will idle if nothing is in queue
for p in process:
    p.start()

with open(ipPath) as f:
    for line in f:
        inputQueue.put(line, block=True) # put line in queue, only if free slot avaible
for p in process:
    inputQueue.put('STOP') # signal workers to stop as no further input

    # wait for processes to finish
for p in process:
    p.join()
RaJa
  • 1,471
  • 13
  • 17