4

I have big numpy array. Its shape is (800,224,224,3), which means that there are images (224 * 244) with 3 channels. For distributed deep learning in Spark, I want to change 'numpy array' to 'spark dataframe'.

My method is:

  1. Changed numpy array to csv
  2. Loaded csv and make spark dataframe with 150528 columns (224*224*3)
  3. Use VectorAssembler to create a vector of all columns (features)
  4. Reshape the output of 3 but in the third step, I failed since computation might be too much high

In order to make a vector from this:

+------+------+
|col_1 | col_2|
+------+------+
|0.1434|0.1434|
|0.1434|0.1451|
|0.1434|0.1467|
|0.3046|0.3046|
|0.3046|0.3304|
|0.3249|0.3046|
|0.3249|0.3304|
|0.3258|0.3258|
|0.3258|0.3263|
|0.3258|0.3307|
+------+------+

to this:

+-------------+
|   feature   |
+-------------+
|0.1434,0.1434|
|0.1434,0.1451|
|0.1434,0.1467|
|0.3046,0.3046|
|0.3046,0.3304|
|0.3249,0.3046|
|0.3249,0.3304|
|0.3258,0.3258|
|0.3258,0.3263|
|0.3258,0.3307|
+-------------+

But the number of columns are really many...

I also tried to convert numpy array to rdd directly but I got 'out of memory' error. In single machine, my job works well with this numpy array.

Shaido
  • 27,497
  • 23
  • 70
  • 73
주은혜
  • 97
  • 1
  • 9
  • 1
    Out of memory error, is it? Can you try setting the driver memory to whatever maximum you can give it? I use 6g and my laptop ram is 8gb. – pissall Oct 24 '17 at 04:09

3 Answers3

5

You should be able to convert the numpy array directly to a Spark dataframe, without going through a csv file. You could try something like the below code:

from pyspark.ml.linalg import Vectors

num_rows = 800
arr = map(lambda x: (Vectors.dense(x), ), numpy_arr.reshape(num_rows, -1))
df = spark.createDataFrame(arr, ["features"])
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • 3
    Hello, it gives me this error "TypeError: not supported type: " – A.B Apr 09 '19 at 11:33
  • @A.B: Try converting to tuples of vectors and see if it works, you can refer to: https://stackoverflow.com/questions/41328799/how-to-convert-rdd-of-dense-vector-into-dataframe-in-pyspark/api.stackexchange.com – Shaido Apr 09 '19 at 11:58
  • This answer does not actually work, given A.B.'s comment and testing. – John Stud Dec 02 '20 at 20:15
  • @JohnStud: You were correct. It seems I didn't hear back from A.B whether using tuples worked and then I forgot about it. I tested it out and updated the answer. It should work now. – Shaido Dec 03 '20 at 03:50
2

You can also do this, which I find most convenient:

import numpy as np
import pandas as pd
import pyspark

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

array = np.linspace(0, 10)
df_spark = sqlContext.createDataFrame(pd.DataFrame(array))
df_spark.show()

The only downside is that pandas needs to be installed.

JulianWgs
  • 961
  • 1
  • 14
  • 25
1

Increase worker memory from the default value of 1 GB using spark.executor.memory flag to resolve out of memory error if you are getting error in worker node otherwise if you are getting this error in driver then try increasing the driver memory as suggested by @pissall. Also, try to identify proper fraction of memory(spark.memory.fraction) to be used for keeping RDD in memory.

Shubham Jain
  • 392
  • 1
  • 3
  • 15
  • Does it matter tweaking the executor memory when working with spark locally? The executors are used when we have a cluster and multiple worker nodes, right? I suggested to try increasing the driver memory in this case. Help me if it works differently. – pissall Oct 24 '17 at 09:29
  • No, executor memory doesn't matter in case of local mode as both executors and driver run in same JVM process whose memory can be increased by setting driver memory. In question he says the job runs well in single machine, so I assumed he is working in cluster mode – Shubham Jain Oct 24 '17 at 09:48
  • Yes, I am woriking in cluster mode. Your answer also helped me a lot!! I am new to spark, especially pyspark and python. I am slow to make my project though, I think Im getting in. Thanks all!!! – 주은혜 Oct 24 '17 at 16:28