0

I am trying to build a simple pipeline using Kafka as streaming source to Spark's structured streaming APIs, performing group-by aggregations and persisting the results to HDFS.

But, as soon as I submit the job, I am getting Java heap space error even though the streaming data is very less in volume.

Below is the code in pyspark:

allEvents =spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe","MyNewTopic") \
    .option("group.id","aggStream") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(col("value").cast("string"))

aaIDF = allEvents.filter(col("value").contains("myNewAPI")).select(from_json(col("value"),aaISchema) \
 .alias("colName")).select(col("colName.eventTime"), col("colName.appId"),col("colName.articleId"),col("colName.locale"),col("colName.impression"))

windowedCountsDF = aaIDF.withWatermark("eventTime","10 minutes") \
    .groupBy("appId","articleId","locale",window("eventTime", "2 minutes")).sum("impression").withColumnRenamed("sum(impression)", "views")


query = windowedCountsDF \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/CDS/events/JS/agg/" + strftime("%Y/%m/%d/%H/%M", gmtime()) + "/") \
    .option("checkpointLocation", "/CDS/checkpoint/").start()

And below is the exception:

17/11/23 14:24:45 ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:214)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:315)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59
grepIt
  • 116
  • 9
  • How do you submit the job? What is `spark-submit` and the options? – Jacek Laskowski Nov 23 '17 at 23:46
  • @JacekLaskowski spark-submit --packages 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1' ./AggEventStructuredStreamListener.py – grepIt Nov 24 '17 at 04:04
  • Can you check out `jconsole` and see what your memory requirements are for the application (and adjust them accordingly)? I don't see any apparent reason why it could fail. Please note that the `--packages` option includes libraries for different Spark versions - 2.2.0 and 2.1.0‌. Also, you're using `spark-sql-k‌​afka` and `sp‌​ark-streaming-kafka` which I doubt you really need. Get rid of `org.apache.spark:sp‌​ark-streaming-kafka-‌​0-8_2.11:2.1.1`. – Jacek Laskowski Nov 24 '17 at 08:03

2 Answers2

2

Two possible reasons:

  1. Your watermark set does not take effect. You should reference the column with colName.eventTime.

    Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

  2. You should set a bigger value to --driver-memory or --executor-memory for Spark.

Community
  • 1
  • 1
secfree
  • 4,357
  • 2
  • 28
  • 36
0

You need to have appropriate driver and execute memory set while submitting the jobs. This post gives you a brief idea of how to set these configurations.

Akhil Bojedla
  • 1,968
  • 12
  • 19