0

Hy, I have a Spark DataFrame and I made some transformation using SQL context, for example, select only two Columns in all data.

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

but now I want transform this sqlcontext a pandas dataframe, and I'm using

pddf = df_oraAS.toPandas()

but the output stop here and I need restart the IDE (spyder)

6/01/22 16:04:01 INFO DAGScheduler: Got job 0 (toPandas at <stdin>:1) with 3 output partitions
16/01/22 16:04:01 INFO DAGScheduler: Final stage: ResultStage 0 (toPandas at <stdin>:1)
16/01/22 16:04:01 INFO DAGScheduler: Parents of final stage: List()
16/01/22 16:04:01 INFO DAGScheduler: Missing parents: List()
16/01/22 16:04:01 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1), which has no missing parents
16/01/22 16:04:01 INFO SparkContext: Starting job: toPandas at <stdin>:1
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KB, free 9.4 KB)
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.9 KB, free 14.3 KB)
16/01/22 16:04:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50877 (size: 4.9 KB, free: 511.1 MB)
16/01/22 16:04:01 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/01/22 16:04:01 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1)
16/01/22 16:04:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
16/01/22 16:04:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 119523958 bytes)
16/01/22 16:04:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 117876401 bytes)
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:200)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:462)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:252)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:247)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:317)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:63)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

What I did wrong? thanks

EDIT: more completed: I load the date from Oracle Database (cx_Oracle) and put the data in a pandas dataframe

df_ora = pd.read_sql('SELECT* FROM DEC_CLIENTES', con=connection) 

Next I created a sparkContext to manipulate the dataframe

sqlContext = SQLContext(sc)

df_oraAS = sqlContext.createDataFrame(df_ora)

df_oraAS.registerTempTable("df_oraAS")

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

and I want convert again from sqlcontext to a pandas dataframe

 pddf = df_oraAS.toPandas() 
Kardu
  • 865
  • 3
  • 13
  • 24

2 Answers2

3

toPandas is basically collect in disguise. An output is a local Pandas DataFrame. If data doesn't fit into driver memory it will simply fail hence the error you see.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Could you [edit](https://stackoverflow.com/posts/34951111/edit) the question? It will be easier to find. – zero323 Jan 22 '16 at 17:37
  • Unfortunately @Kardu like zero323 said, if you don't have enough space you'll get this error while collect whether your original tables has 2 or 1 column... – eliasah Jan 22 '16 at 17:44
  • @eliasah thanks for your comment, but I have a Machine with 80Gb RAM Memory and i'm using only 14Gb ... – Kardu Jan 22 '16 at 17:55
  • @eliasah maybe the spark cannot use all machine memory? How can I know? – Kardu Jan 22 '16 at 18:07
  • Could you double check number of records in your data? And correct code in the question. The last line should be different, shouldn't it? – zero323 Jan 22 '16 at 18:20
  • Also, why do you read data via Pandas instead directly into Spark? – zero323 Jan 22 '16 at 18:21
  • I confirm, only five rows and two column. @zero323 because I don't know how to import data from oracle database to spark dataframe... :) and for me it's more easy work with pandas... – Kardu Jan 22 '16 at 18:33
  • It kind of beats a whole purpose of using Spark. If data fits into Pandas data frame it is rather unlikely you gain a lot by using Spark. Not to mention driver becomes a bottleneck (data is accessed sequentially and has to be transfered to the workers). Regarding how-to see [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases). – zero323 Jan 23 '16 at 11:34
  • Regarding Spark not being able to use the whole machine's memory. It's likely that because you're running within an IDE, your memory is capped out using -Xmx depending on how the process is launched. So unless you explicitly set -Xmx, you definitely cannot use the whole system's memory. – Hamel Kothari Jan 23 '16 at 14:39
0

Your pd.read_sql call reads the full database into a pandas dataframe. This is local to the driver. When you call createDataFrame, it then creates a Spark DataFrame from your python pandas dataframe, which results in a really large task size (see the log line below):

16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.

Even though you are selecting only 5 rows, you're actually first loading the full database into memory using that pd.read_sql call. If you're reading from an Oracle SQL database, why don't you use the spark JDBC driver and then perform your select filters and then call toPandas?

What your code is doing is reading the whole DB to pandas, writing to Spark, filtering and reading back to Pandas.

Hamel Kothari
  • 717
  • 4
  • 11