0

I am trying to use Spotify's Annoy for processing a 200K volume dataset. Sine the dataset is huge, I am trying to use multiprocessing. I constantly get the "cannot pickle annoy object" error. I changed my code to add the items to the annoy index outside of the function which I using for multiprocessing, but the problem persists.

Can someone look at code, and help?

Following is code for trainSimilaritySearchParallel

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA, KernelPCA
from matplotlib import pyplot as plt
from sklearn.preprocessing import LabelEncoder
from shutil import copyfile
import os, sys, math, time, datetime
from sklearn import metrics
from utils.dataSet import DataSet
from utils.trainer import Trainer
from utils.dataLoader import DataLoader
from featureExtractorInception import inception_v3_sliced

from config import *
from sklearn.decomposition import PCA, KernelPCA
from sklearn.cluster import KMeans, DBSCAN
import pickle
import gc
import annoy
from sklearn.preprocessing import normalize
from sklearn.random_projection import _gaussian_random_matrix
from sklearn.random_projection import GaussianRandomProjection
import multiprocessing

#import torch.multiprocessing as mp
#from joblib import Parallel, delayed
#from joblib import wrap_non_picklable_objects
#from joblib.externals.loky import set_loky_pickler

from multiprocessing import Process, Queue, Pool
from threading import Thread


class Train(Process):
    def __init__(self):
        super(Train, self).__init__()
    
        self.dLoader = None
        self.annoyIndex = None
        self.queue = None
        self.model = None
        self.rng = np.random.RandomState(42)
        self.gauss_data = GaussianRandomProjection(random_state=self.rng, eps=0.5, n_components=projectedDim)
        self.dfImgIndex = {}
        self.results = {}
        
        
        
    def defineTransform(self):
        
        normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    
        transform = transforms.Compose([
                transforms.Resize((inputSize,inputSize)), 
                transforms.ToTensor(),
                normalize
            ])
        
        return transform
    
    def createDataSet(self, base_path):
                
            image=[]
            labels=[]
            
            for file in os.listdir(base_path):
                className = "Unknown"
                if (file.split(".")[1] == "PNG" or file.split(".")[1] == "png") :
                    image.append(file)
                    labels.append(className)
                
            data = {'Images':image, 'Labels':labels} 
            data = pd.DataFrame(data) 
            
            #print("DataSet Created")
            #print(len(data))
            
            
            lb = LabelEncoder()
            data['encodedLabels'] = lb.fit_transform(data['Labels'])
            data.head()
            
            return data
    
    def createReducedEmbedding(self, item_vector, LoadDate, q ):
    
        # get vector from multiprocessing queue
        #item_vector = queue.get()

        batch_id = item_vector[0]
        imgPath = item_vector[1]
        inputs = item_vector[2]

        #print(batch_id)
        #print(imgPath)
        #print(inputs)
        
        inputs = inputs.to(device)
        #labels = labels.to(device)
        
        #print(f"Modeling Feature Representation for Image with BatchID :: {batch_id}")
        
        outputs = self.model(inputs)
        
        #print(output[0])
        
        featureRepresentation = outputs[0].data
        
        #print(f"Done Modeling Feature Representation for Image with BatchID :: {batch_id}")
        
        #print("Beginning Print For Features")
        #print(featureRepresentation.size)
        
        featureRepresentation1 = featureRepresentation.view(featureRepresentation.size(0), -1)
        
        #print(featureRepresentation1.shape)
        
        #print(f"Normalizing Feature Representation for BatchID :: {batch_id}")
        
        featureRepresentation2 = normalize(featureRepresentation1.cpu().numpy(), axis=1)
        
        #print(featureRepresentation2.shape)
        
        #print(f"Done Normalizing Feature Representation for BatchID :: {batch_id}")
        
        #print(f"Reducing Feature Representation to Embedding for BatchID :: {batch_id}")
        
        imgEmbedding = self.executeDimReduction(originalDim, projectedDim, featureRepresentation2)
        
        #print(f"Done Reducing Feature Representation to Embedding for BatchID :: {batch_id}")
        
        #print(f"Adding Embedding to Feature Index :: {batch_id}")
        
        #new_record = pd.DataFrame([[batch_id, imgPath[0]]],columns=["BATCHID","IMGPATH"])
        #dfImgIndex[batch_id] =imgPath[0]
        
        print(f"Done Adding Embeddings to Feature Index")

        #results[batch_id] = imgEmbedding

        q.put((batch_id, imgPath[0], imgEmbedding))

    
    def execFeatExtrct(self, phase, LoadDate):
    
       
        pool = multiprocessing.Pool(processes=25)
        
        self.defineAnnoyIndex(projectedDim, LoadDate)
    
        self.model = inception_v3_sliced(pretrained=True, stop_layer=12)
        self.model.share_memory()
        self.model = self.model.to(device)

        trainData = self.createDataSet(dataPath)
    
        bTransform = self.defineTransform()
        
        bDataSet = DataSet(dataPath, trainData, bTransform)
        
        wttDL = DataLoader(
                                        path=dataPath
                                        , data = bDataSet
                                        , valSplit = valSplit
                                        , batchSize=batchSize
                                        , dataSplit=False
                                        , transforms=bTransform
                                        , dType = "Train"
                                        , device=device
                                    )
        
        self.dLoader = wttDL.loadData()

        vectorindx = None
        jobs = []
        results = []
        q = multiprocessing.Queue()
        
        print("Begin adding all tuples to funcArgsList")

        for batch_id, (imgPath, inputs, labels) in enumerate(self.dLoader[phase]): 

            vectorindx = (batch_id, imgPath, inputs)  
            p  = multiprocessing.Process(target=self.createReducedEmbedding, args=(vectorindx, LoadDate, q))
            jobs.append(p)
        
        for proc in jobs:
            proc.start()
            
        for proc in jobs:
            ret = q.get()
            results.append(ret)
            
        for proc in jobs:
            proc.join()


      
        print("Done adding all tuples to funcArgsList")
       
        for tup in results.items:
            
            (batch_id, imgPath[0], imgEmbedding)
            batch_id = tup[0]
            imgEmbedding = tup[2]
            self.addToEnbeddingIndex(batch_id, imgEmbedding)
            self.dfImgIndex[batch_id] = tup[1]

        print("Begin Building Feature Index")
             
        self.buildAnnoyIndex(n_trees=250)
        
        print("Done Building Feature Index")
        
        print(len(self.dfImgIndex))
        
        print("Begin Saving Feature Index")
        
        #dfDict = dfImgIndex.to_dict()
        
        print("Saving Data")
        print(self.dfImgIndex)
        
        self.sAnnoyIndex(self.dfImgIndex, LoadDate)
             
        print("Done Saving Feature Index")
        
        self.annoyIndex.unload()
        
        return self.dfImgIndex

    def executeDimReduction(self, originalDim, projectedDim, imgEmbedding):
        #print("Inside executeDimReduction")
        randomProjectionMatrix = None
        if projectedDim and originalDim > projectedDim:
          
          randomProjectionMatrix = self.gauss_data.fit_transform(imgEmbedding)
        
        
        return randomProjectionMatrix.flatten()
  
    def defineAnnoyIndex(self, embeddingDim, LoadDate):
        
        self.annoyIndex = annoy.AnnoyIndex(embeddingDim, 'dot')

        #self.buildAnnoyIndex(n_trees=250)

        #self.annoyindex.unload()

        #self.saveAnnoyIndex(LoadDate)
  
    def buildAnnoyIndex(self, n_trees):
        
        self.annoyIndex.build(n_trees)
  
    def addToEnbeddingIndex(self, i, embedding):
        
        self.annoyIndex.add_item(i, embedding)

    def saveAnnoyIndex(self, LoadDate):
        
        mPath = os.path.join(
                featuresPath,  
                "{}.ann".format(
                    "AnnoyFeaturesIndex_"
                    + LoadDate
                ))
        
        print("Saving Annoy Index")
        print(mPath)
        
        self.annoyIndex.save(mPath)
        
        
    def sAnnoyIndex(self, dfDict, LoadDate):
        
        mPath = os.path.join(
                featuresPath,  
                "{}.ann".format(
                    "AnnoyFeaturesIndexFinal_"
                    + LoadDate
                ))
        
        self.annoyIndex.save(mPath)
        
        
        mPath = os.path.join(
                featuresPath,  
                "{}".format(
                    "FeatureMapping_"
                    + LoadDate
                ))
        
        with open(mPath + '.mapping', 'wb') as handle:
            pickle.dump(dfDict, handle, protocol=pickle.HIGHEST_PROTOCOL)
            print('Mapping is saved to disk.')
        
    def getAnnoyIndex(self, idxFileName):
        
        self.annoyindex = annoy.AnnoyIndex(projectedDim, 'dot')
        self.annoyindex.load(idxFileName, prefault=False)
        print('Annoy index is loaded.')

I make the following call to extract Features:

from trainSimilaritySearchParallel import Train
trn = Train()
dfImgIndex = trn.execFeatExtrct(phase="train", LoadDate=loadDate)

Thanks

P Ved
  • 109
  • 3
  • 13

0 Answers0