I am trying to build a pyspark pipeline where I perform a sequence of steps such as missing value treatment, scaling, discretisation. I need a proper dataframe at the end.
I am currently stuck at this step.
num_imputer = Imputer(inputCols = df.columns, outputCols = df.columns, strategy = impute_type)
num_scaling = StandardScaler(inputCol = 'features' , outputCol = 'scaledFeatures')
pipeline = Pipeline(stages = [num_imputer, vector_assembler,num_scaling])
df = pipeline.fit(df).transform(df)
This line fails
df = df.select('scaledFeatures').rdd.map(lambda x:[float(y) for y in x['scaledFeatures']]).toDF([val+'scale' for val in df.columns])
The error:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 617.0 failed 1 times, most recent failure: Lost task 0.0 in stage 617.0 (TID 538) (LAPTOP-8PIAMAL6 executor driver): org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) at java.net.AbstractPlainSocketImpl.accept(Unknown Source) at java.net.PlainSocketImpl.accept(Unknown Source) at java.net.ServerSocket.implAccept(Unknown Source) at java.net.ServerSocket.accept(Unknown Source) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175) ... 14 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) at java.net.AbstractPlainSocketImpl.accept(Unknown Source) at java.net.PlainSocketImpl.accept(Unknown Source) at java.net.ServerSocket.implAccept(Unknown Source) at java.net.ServerSocket.accept(Unknown Source) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175) ... 14 more
Traceback:
File "c:\programdata\anaconda3\lib\site-packages\streamlit\script_runner.py", line 354, in _run_script
exec(code, module.__dict__)
File "C:\Users\hp\Documents\BITS\4 sem\Project\python_no_code_spark.py", line 187, in <module>
main()
File "C:\Users\hp\Documents\BITS\4 sem\Project\python_no_code_spark.py", line 171, in main
df = df.select('scaledFeatures').rdd.map(lambda x:[float(y) for y in x['scaledFeatures']]).toDF(['a','b','c','d','e','f','g','h','i'])
File "c:\programdata\anaconda3\lib\site-packages\pyspark\sql\session.py", line 66, in toDF
return sparkSession.createDataFrame(self, schema, sampleRatio)
File "c:\programdata\anaconda3\lib\site-packages\pyspark\sql\session.py", line 675, in createDataFrame
return self._create_dataframe(data, schema, samplingRatio, verifySchema)
File "c:\programdata\anaconda3\lib\site-packages\pyspark\sql\session.py", line 698, in _create_dataframe
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "c:\programdata\anaconda3\lib\site-packages\pyspark\sql\session.py", line 486, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "c:\programdata\anaconda3\lib\site-packages\pyspark\sql\session.py", line 460, in _inferSchema
first = rdd.first()
File "c:\programdata\anaconda3\lib\site-packages\pyspark\rdd.py", line 1588, in first
rs = self.take(1)
File "c:\programdata\anaconda3\lib\site-packages\pyspark\rdd.py", line 1568, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "c:\programdata\anaconda3\lib\site-packages\pyspark\context.py", line 1227, in runJob
sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "c:\programdata\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1309, in __call__
return_value = get_return_value(
File "c:\programdata\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 111, in deco
return f(*a, **kw)
File "c:\programdata\anaconda3\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
raise Py4JJavaError(
Kindly provide inputs