I am using Spark's FP-growth algorithm. I was getting OOM errors when I was doing a collect, I then changed the code so that I can save the results in a text file on HDFS rather than collecting them on the driver node. Here is the related code:
// Model building:
val fpg = new FPGrowth()
.setMinSupport(0.01)
.setNumPartitions(10)
val model = fpg.run(transaction_distinct)
Here is a transformation that should give me RDD[Strings].
val mymodel = model.freqItemsets.map { itemset =>
val model_res = itemset.items.mkString("[", ",", "]") + ", " + itemset.freq
model_res
}
I then save the model results as. Unfortunately, this is really SLOW!!!
mymodel.saveAsTextFile("fpm_model")
I get these errors:
16/02/04 14:47:28 ERROR ErrorMonitor: AssociationError[akka.tcp://sparkDriver@ipaddress:46811] -> [akka.tcp://sparkExecutor@hostname:39720]: Error [Association failed with [akka.tcp://sparkExecutor@hostname:39720]][akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@hostname:39720]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: hostname/ipaddress:39720] akka.event.Logging$Error$NoCause$
16/02/04 14:47:28 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3, hostname, 58683)
16/02/04 14:47:28 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
16/02/04 14:47:28 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@ipaddress:46811] ->[akka.tcp://sparkExecutor@hostname:39720]: Error [Association failed with [akka.tcp://sparkExecutor@hostname:39720]][akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@hostname:39720]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: hostname/ipaddress:39720