1

I am using Spark's FP-growth algorithm. I was getting OOM errors when I was doing a collect, I then changed the code so that I can save the results in a text file on HDFS rather than collecting them on the driver node. Here is the related code:

// Model building:

val fpg = new FPGrowth()
  .setMinSupport(0.01)
  .setNumPartitions(10)
val model = fpg.run(transaction_distinct)

Here is a transformation that should give me RDD[Strings].

val mymodel = model.freqItemsets.map { itemset =>
  val model_res = itemset.items.mkString("[", ",", "]") + ", " + itemset.freq
  model_res
}

I then save the model results as. Unfortunately, this is really SLOW!!!

mymodel.saveAsTextFile("fpm_model")

I get these errors:

16/02/04 14:47:28 ERROR ErrorMonitor: AssociationError[akka.tcp://sparkDriver@ipaddress:46811] -> [akka.tcp://sparkExecutor@hostname:39720]: Error [Association failed with [akka.tcp://sparkExecutor@hostname:39720]][akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@hostname:39720]

Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: hostname/ipaddress:39720] akka.event.Logging$Error$NoCause$
16/02/04 14:47:28 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3, hostname, 58683)
16/02/04 14:47:28 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
16/02/04 14:47:28 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@ipaddress:46811] ->[akka.tcp://sparkExecutor@hostname:39720]: Error [Association failed with [akka.tcp://sparkExecutor@hostname:39720]][akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@hostname:39720]

Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: hostname/ipaddress:39720
Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50
user3803714
  • 5,269
  • 10
  • 42
  • 61

0 Answers0