I am trying to use Apache Spark to process my large (~230k entries) cassandra dataset, but I am constantly running into different kinds of errors. However I can successfully run applications when running on a dataset ~200 entries. I have a spark setup of 3 nodes with 1 master and 2 workers, and the 2 workers also have a cassandra cluster installed with data indexed with a replication factor of 2. My 2 spark workers show 2.4 and 2.8 GB memory on the web interface and I set spark.executor.memory
to 2409 when running an application, to get a combined memory of 4.7 GB. Here is my WebUI Homepage
The environment page of one of the tasks
At this stage, I am simply trying to process data stored in cassandra using spark. Here is the basic code I am using to do this in Java
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", CASSANDRA_HOST)
.setJars(jars);
SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);
CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
System.out.println(rdd.count());
For a successful run, on a small dataset (200 entries), the events interface looks something like this
But when I run the same thing on a large dataset (i.e. I change only the CASSANDRA_COLUMN_FAMILY
), the job never terminates inside the terminal, the log looks like this
and after ~2 minutes, the stderr for the executors looks like this
and after ~7 minutes, I get
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
in my terminal, and I have to manually kill the SparkSubmit
process. However, the large dataset was indexed from a binary file that occupied only 22 MB, and doing nodetool status
, I can see that only ~115 MB data is stored in both of my cassandra nodes. I have also tried to use Spark SQL on my dataset, but have got similar results with that too. Where am I going wrong with my setup, and what should I do to successfully process my dataset, for both a Transformation-Action program and a program that uses Spark SQL.
I have already tried the following methods
Using
-Xms1G -Xmx1G
to increase memory, but the program fails with an exception saying that I should instead setspark.executor.memory
, which I have.Using
spark.cassandra.input.split.size
, which fails saying it isn't a valid option, and a similar option isspark.cassandra.input.split.size_in_mb
, which I set to 1, with no effect.
EDIT
based on this answer, I have also tried the following methods:
set
spark.storage.memoryFraction
to 0not set
spark.storage.memoryFraction
to zero and usepersist
withMEMORY_ONLY
,MEMORY_ONLY_SER
,MEMORY_AND_DISK
andMEMORY_AND_DISK_SER
.
Versions:
Spark: 1.4.0
Cassandra: 2.1.6
spark-cassandra-connector: 1.4.0-M1