I'm trying to learn to use Pyspark. I'm usin spark-2.2.0- with Python3 I'm in front of a problem now and I can't find where it came from. My project is to adapt a algorithm wrote by data-scientist to be distributed. The code below it's what I have to use to extract the features from images and I have to adapt it to extract features whith pyspark.
import json
import sys
# Dependencies can be installed by running:
# pip install keras tensorflow h5py pillow
# Run script as:
# ./extract-features.py images/*.jpg
from keras.applications.vgg16 import VGG16
from keras.models import Model
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
import numpy as np
def main():
# Load model VGG16 as described in https://arxiv.org/abs/1409.1556
# This is going to take some time...
base_model = VGG16(weights='imagenet')
# Model will produce the output of the 'fc2'layer which is the penultimate neural network layer
# (see the paper above for mode details)
model = Model(input=base_model.input, output=base_model.get_layer('fc2').output)
# For each image, extract the representation
for image_path in sys.argv[1:]:
features = extract_features(model, image_path)
with open(image_path + ".json", "w") as out:
json.dump(features, out)
def extract_features(model, image_path):
img = image.load_img(image_path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
features = model.predict(x)
return features.tolist()[0]
if __name__ == "__main__":
main()
I have written the begining of the Code:
rdd = sc.binaryFiles(PathImages)
base_model = VGG16(weights='imagenet')
model = Model(input=base_model.input, output=base_model.get_layer('fc2').output)
rdd2 = rdd.map(lambda x : (x[0], extract_features(model, x[0][5:])))
rdd2.collect()[0]
when I try to extract the feature. There is an error.
~/Code/spark-2.2.0-bin-hadoop2.7/python/pyspark/cloudpickle.py in save_file(self, obj) 623 return self.save_reduce(getattr, (sys,'stderr'), obj=obj) 624 if obj is sys.stdin: --> 625 raise pickle.PicklingError("Cannot pickle standard input") 626 if hasattr(obj, 'isatty') and obj.isatty(): 627 raise pickle.PicklingError("Cannot pickle files that map to tty objects")
PicklingError: Cannot pickle standard input
I try multiple thing and here is my first result. I know that the error come from the line below in the method extract_features:
features = model.predict(x)
and when I try to run this line out of a map function or pyspark, this work fine. I think the problem come from the object "model" and his serialisation whith pyspark. Maybe I don't use a good way to distribute this with pyspark and if you have any clew to help me, I will take them.
Thanks in advance.