1

This is for Pyspark. I am new to this space, so please bear with me.

In order to scale the features, I learnt I had to first convert them to a vector, then use MinMaxScaler() on those vector columns. Normally, you could convert one or more vector columns in one shot, and write it's value to another column - using this:

new_df = VectorAssembler(inputCols = ["colA", "colB"], outputCol = "colC").transform(df)

So I understand this will add another column called colC in the dataframe df. But the issue with this was that colC would have values of both, colA and colB, in colC as a vector.

Here, two issue arises:

  1. I don't mind that setup, that is colC containing the values of colA and colB as a vector, as long as MinMaxScaler() does its job, and I am able to separte them later. Former it does, but latter it does not. I don't know how to separate them; at least I've not found a way to. Here's what I mean: https://spark.apache.org/docs/2.1.0/ml-features.html#minmaxscaler (Look for its example in Python)

So in there, when you do scaledData.select("features", "scaledFeatures").show(), it shows this:

+--------------+--------------+
|      features|scaledFeatures|
+--------------+--------------+
|[1.0,0.1,-1.0]| [0.0,0.0,0.0]|
| [2.0,1.1,1.0]| [0.5,0.1,0.5]|
|[3.0,10.1,3.0]| [1.0,1.0,1.0]|
+--------------+--------------+ 

How do you separate the values in scaledFeatures in two separate columns? Regex? Don't look at features. It's same, just that it's unscaled.

  1. To counter #1, I individually scaled them - so colC had the vectorized scaled value of colA and a new column, colD had the vectorized scaled value of colB.

The problem with #2 is that colC and colD have now a vectroized values. So each value in a column, which was supposed to be just numbers, is now within square brackets - They're all vectorized.

Some thing like this (this is my own data set - not from the example above):

+-------------------------------+---------------------------------+
|           [0.5043343370745506]|             [0.0912011325868883]|
|           [0.5037868418651337]|             [0.0912011325868883]|
|           [0.5039693402682727]|             [0.0912011325868883]|
|           [0.5036955926635642]|             [0.0912011325868883]|
|           [0.5039693402682727]|             [0.0912011325868883]|
|           [0.5039693402682727]|             [0.0912011325868883]|
|           [0.5040605894698421]|             [0.0912011325868883]|
|           [0.5038780910667031]|             [0.0912011325868883]|
|           [0.5027831006478694]|             [0.0912011325868883]|
|           [0.5035130942604252]|             [0.0912011325868883]|
+-------------------------------+---------------------------------+

How can I unvectoize it? I can't do a df.describe().show() on this df. It just throws an empty table.

Thanks in advance.

The error:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 104, wn0-nkhlla.535cxyjeursebhq0ajgymk5xab.tx.internal.cloudapp.net, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'word' is not in list`

`During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 253, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1352, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in extract
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: word

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)`

`Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'word' is not in list`

`During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 253, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1352, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in extract
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: word

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more`

`Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 58, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 695, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 390, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 361, in _inferSchema
    first = rdd.first()
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1376, in first
    rs = self.take(1)
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1358, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 1042, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 104, wn0-nkhlla.535cxyjeursebhq0ajgymk5xab.tx.internal.cloudapp.net, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'word' is not in list`

`During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 253, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1352, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in extract
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: word

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)`

`Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'word' is not in list`

`During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 253, in main
    process()
  File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 248, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 379, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1352, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/hdp/current/spark2-client/python/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 2, in extract
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: word

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Anonymous Person
  • 1,437
  • 8
  • 26
  • 47
  • 1
    Possible duplicate of [How to split Vector into columns - using PySpark](https://stackoverflow.com/questions/38384347/how-to-split-vector-into-columns-using-pyspark) – 10465355 Feb 28 '19 at 10:24
  • Thanks. I did go through this. But does it matter that I am using a DF and that article is for an RDD? – Anonymous Person Feb 28 '19 at 10:28
  • One of the proposed solution uses RDD as _an intermediate_ stage, but both input and output is `DataFrame`. You'll be good. – 10465355 Feb 28 '19 at 10:32
  • Great. Thanks. I am trying that now. – Anonymous Person Feb 28 '19 at 10:33
  • 1
    The linked answer is about DF, too - it just *converts* it to an RDD as an intermediate step (which is admittedly something one would want to avoid). The real question is if in the meanwhile (the answer dates back from 2016, and Spark evolves rather quickly) there has been a better way... – desertnaut Feb 28 '19 at 10:35
  • Dammit, I am getting a nasty error. I tried option #1, and always bump into this when I try `df.rdd.map(extract).toDF(["word"]) `. Here's the error (first 421 characters of it): `An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 104, wn0-nkhlla.535cxyjeursebhq0ajgymk5xab.tx.internal.cloudapp.net, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): ` – Anonymous Person Feb 28 '19 at 12:10
  • Edited my post and added the full error. FYI, it's an HDInsight cluster. I don't know if that'd make any difference. I am running the code on Jupyter notebook. – Anonymous Person Feb 28 '19 at 12:16
  • NVM. The UDF way seems to have worked. I have a problem though. When I run the code after the edits I made, I get this: `DataFrame[colA: int, xs[0]: double, xs[1]: double]` How can I save the data frame? When I do a `.show()`, it shows me the new DF. How can I save it? – Anonymous Person Feb 28 '19 at 12:32
  • NVM. Figured that out too. Case closed. – Anonymous Person Feb 28 '19 at 17:19

0 Answers0