1

Background: I am completely new to the whole Spark platform & concept and I am trying to learn how to operate it with via R with sparklyr. I started following an online course on the topic and I am trying to use it for my own data analysis as a way to learn it.

Problem: I am trying to load a 6.3gb csv dataset (~30 mil rows, ~20 cols) but I get the following error (from what I could tell, the same chunks kept repeating themselves, I give here the first 3 of them, as I will otherwise reach the character limit for the post). The code runs but after 17 minutes it exits with the following error (no data is loaded):

Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)

The currently active SparkContext was created at:

org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
java.lang.reflect.Method.invoke(Unknown Source)
sparklyr.Invoke.invoke(invoke.scala:139)
sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
sparklyr.StreamHandler.read(stream.scala:66)
sparklyr.BackendHandler.channelRead0(handler.scala:51)
sparklyr.BackendHandler.channelRead0(handler.scala:4)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)

    at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
    at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:716)
    at org.apache.spark.SparkContext$$anonfun$parallelize$1.apply(SparkContext.scala:715)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.SparkContext.parallelize(SparkContext.scala:715)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke.invoke(invoke.scala:139)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Unknown Source)
Error: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

Here is my R code:

library(sparklyr)
spark_install(version = "2.1.0")
sc <- spark_connect(master = "local")
testdata = spark_read_csv(sc, name = "testdata", path = ...)

Outside of Spark, I am able to load the file e.g. using read_csv. I googled the issue and it has been mentioned that potential cause is an OutOfMemory issue - I am not quite sure if this is the problem and how to fix it.

I'd be grateful if someone could point me out a way how to debug it and fix it!

Cheers

zero323
  • 322,348
  • 103
  • 959
  • 935
Jean_N
  • 489
  • 1
  • 4
  • 19

1 Answers1

1

A few things:

  • Unless you use tiny data you should forget about local mode. It is primarily designed for testing and small scale experimentation, not for working on even moderate size data.

    As it uses only a single JVM for both driver and executor code, the potential for serious fault recovery, and if processing goes AWOL you might loose a whole session (which seems to be the case here).

    So if you want to test things locally on a moderate size data consider using standalone mode otherwise just downscale the dataset.

    On a side note local mode uses only one processing thread. Even for testing it makes more sense to use local[n] (for n threads) or local[*] (for all available cores).

  • Be ready to adjust the configuration as default values are extremely conservative - for example spark.driver.memory is by default 1 GB - you might get away with it, in a standalone mode, but not when all components are embedded in a single JVM.

  • Don't trust sparklyr defaults.

    sparklyr developers made a very unfortunate choice of eagerly caching data in memory by default. It not only goes against Spark defaults (which, for a reason, uses MEMORY_AND_DISK for Dataset API) and rarely provides any practical benefits on real size data, but also interferes with Spark optimizer in some ugly ways (most notably preventing projection and selection pushdown).

    So get into a habit of using memory = FALSE whenever applicable:

    spark_read_csv(sc, name = "testdata", memory = FALSE, path = ...)
    
  • Provide schema for the reader instead of using schema inference. See SparklyR: Convert directly to parquet

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thank you very much, this is very helpful! I see that I have to invest a lot more time into understanding this - if you would have a suggestion for an easy to understand guide or other sources, kindly let me know. Another question: what is actualy considered tiny data when it comes to what you describe about using "local"? Is my 30mil rows/12 columns considered small/moderate or big in this context? Cheers – Jean_N Feb 01 '19 at 10:46
  • 1
    My personal path of choice would be to learn core (i.e. Scala API, O'Reilly has a bunch of decent books, by Spark contributors, EPFL has nice MOOC on Coursera) and then move to sparklyr, but I realize that might not be the most appealing path. Other than that, just don't get deceived by a familiar API - it is your worst enemy here. As of moderate it is subjective and fuzzy, however you can try to compare things to the available memory - if size of the raw data is of the same order of magnitude as the available RAM I'd call it "moderate". For learning I'd use at least order of magnitude less. – zero323 Feb 01 '19 at 12:33