13

I am using pyspark to estimate parameters for a logistic regression model. I use spark to calculate the likelihood and gradients and then use scipy's minimize function for optimization (L-BFGS-B).

I use yarn-client mode to run my application. My application could start to run without any problem. However, after a while it reports the following error:

Traceback (most recent call last):
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
    options={'disp': False})
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
    callback=callback, **options)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
    f, g = func_and_grad(x)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
    f = fun(x, *args)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
    return function(*(wrapper_args + args))
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
    yield self._read_with_length(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
    data = self._sock.recv(left)
socket.timeout: timed out

I also found python broken pipe error when I set spark log level to "ALL".

I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what's going on?

--Update--

I found this is related to the optimization routine I used in my program.

What I was doing is estimating a statistical model with maximum likelihood method using EM algorithm (as iterative algorithm). During each iteration, I need to update the parameters by solving a minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to Scipy's minimize routine where I use L-BFGS-B method. It seems that something in this routine that crashes my Spark job. But I have no idea which part of the routine is responsible for this issue.

Another observation is that, while using the same sample and same program, I changed the number of partitions. When the number of partition is small my program could finish without any problem. However, when the number of partitions becomes large, the program starts to crash.

panc
  • 817
  • 2
  • 14
  • 30

4 Answers4

8

I had similar problem. I had an iteration, and sometimes execution took so long it timed out. Increasing spark.executor.heartbeatInterval seemed to solve the problem. I increased it to 3600s to ensure I don't run into timeouts again and everything is working fine since then.

From: http://spark.apache.org/docs/latest/configuration.html :

spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.

Nandor Poka
  • 281
  • 1
  • 10
6

I had a similar problem, and for me, this fixed it:

import pyspark as ps

conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")
sc = ps.SparkContext('local[4]', '', conf=conf) # uses 4 cores on your local machine

More examples of setting other options here: https://gist.github.com/robenalt/5b06415f52009c5035910d91f5b919ad

wordsforthewise
  • 13,746
  • 5
  • 87
  • 117
  • 5
    I ran into the same issue when I ran Spark Streaming for testing purposes on a single node system. It was actually the ``'local[4]'`` parameter that fixed it! From my experience, changing ``"spark.executor.heartbeatInterval"`` (and also ``spark.network.timeout``, as it has to be larger than the heartbeatInterval) did not have any effect in this context. – magraf Apr 23 '20 at 06:15
  • I had the same issue running an app locally. I tried `local[4]`, `local[*]` and I tried increasing `heartbeatInterval` and `network.timeout` without effect. Using `local` solved it for me. CONFIG: (pyspark 3.1.2 via Anaconda, processor: 2 cores (4 logical units)) – vpvinc Oct 11 '21 at 09:05
0

Check the executors logs for details. I have seen similar errors when executors die or are killed by the cluster manager (usually for using more memory than the container is configured for).

cftarnas
  • 1,745
  • 10
  • 9
0

We had the same issue when using Pyspark extension nodes in IBM's SPSS Modeler. All the above solutions (and what else could be found in the Internet) did not work. At some point we figured out that it always occurred when my colleague and me executed Pyspark extension nodes at the same time on the same machine. That seemed to have let to Python workers getting mixed up or killed. Only solution was to not execute Pyspark stuff at the same time...

M. Beining
  • 99
  • 1
  • 9