2

I have some dataframe in Pyspark:

from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder.getOrCreate()
sqlcontext = SQLContext(spark)
df = sqlcontext.createDataFrame([['a'],['b'],['c'],['d'],['e']], ['id'])
df.show()

+---+
| id|
+---+
|  a|
|  b|
|  c|
|  d|
|  e|
+---+

And I have a list of lists:

l = [[1,1], [2,2], [3,3], [4,4], [5,5]]

Is it possible to append this list as a column to df? Namely, the first element of l should appear next to the first row of df, the second element of l next to the second row of df, etc. It should look like this:

+----+---+--+
|  id|     l|
+----+---+--+
|   a| [1,1]|
|   b| [2,2]|
|   c| [3,3]|
|   d| [4,4]|
|   e| [5,5]|
+----+---+--+
pissall
  • 7,109
  • 2
  • 25
  • 45
Tendero
  • 1,136
  • 2
  • 19
  • 34
  • 1
    What should be the criteria to join your list `l` with the `id` in your df? will be the `id` the index of your list? – Cesar A. Mostacero Nov 04 '19 at 22:25
  • @CesarA.Mostacero It should be in order. I've edited the OP to make it clearer, thanks. – Tendero Nov 04 '19 at 22:28
  • There is no clean, efficient way to do what you are asking without a well defined way to order your rows. The lack of implicit row ordering in Spark is a feature (not a bug) and is what allows for distributed, parallel processing. Any executor can grab any chunk of the data and independently process it. If you require order, you need to explicitly define that order (slow because it requires shuffling). – pault Nov 05 '19 at 17:09

3 Answers3

4

UDF's are generally slow but a more efficient way without using any UDF's would be:

import pyspark.sql.functions as F

ldf = spark.createDataFrame(l, schema = "array<int>")

df1 = df.withColumn("m_id", F.monotonically_increasing_id())
df2 = ldf.withColumn("m_id", F.monotonically_increasing_id()) 

df3 = df2.join(df1, "m_id", "outer").drop("m_id") 
df3.select("id", "value").show()
+---+------+
| id| value|
+---+------+
|  a|[1, 1]|
|  b|[2, 2]|
|  d|[4, 4]|
|  c|[3, 3]|
|  e|[5, 5]|
+---+------+
pissall
  • 7,109
  • 2
  • 25
  • 45
  • The join should be performed on `m_id`, not on `id`. – Tendero Nov 05 '19 at 13:37
  • @Tendero Typo, fixed – pissall Nov 05 '19 at 13:38
  • 1
    This is not guaranteed to join in the correct order. It may seem to work for small data, but overall this answer is incorrect. Unfortunately, Spark Dataframes do not have the concept of order between the rows unless you explicitly define that order. – pault Nov 05 '19 at 16:55
  • As mention @pault , this could not guarantee the join. Reference: https://stackoverflow.com/questions/48209667/using-monotonically-increasing-id-for-assigning-row-number-to-pyspark-datafram – Cesar A. Mostacero Nov 05 '19 at 17:02
0

Assuming that you are going to have same amount of rows in your df and items in your list (df.count==len(l)).

You can add a row_id (to specify the order) to your df, and based on that, access to the item on your list (l).

from pyspark.sql.functions import row_number, lit
from pyspark.sql.window import *

df = df.withColumn("row_num", row_number().over(Window().orderBy(lit('A'))))
df.show()

Above code will look like:

+---+-------+
| id|row_num|
+---+-------+
|  1|      1|
|  2|      2|
|  3|      3|
|  4|      4|
|  5|      5|
+---+-------+

Then, you can just iterate your df and access the specified index in your list:

def map_df(row):
  return (row.id, l[row.row_num-1])

new_df = df.rdd.map(map_df).toDF(["id", "l"])

new_df.show()

Output:

+---+------+
| id|     l|
+---+------+
|  1|[1, 1]|
|  2|[2, 2]|
|  3|[3, 3]|
|  4|[4, 4]|
|  5|[5, 5]|
+---+------+
  • This is not the expected output. You overwrote the `id` column. – pissall Nov 05 '19 at 03:01
  • Where I did it? There was just added a `row_num`, `id` is still the same (consecutive). Where `monotonically_increasing_id` I think is not the best option due to: https://stackoverflow.com/questions/48209667/using-monotonically-increasing-id-for-assigning-row-number-to-pyspark-datafram – Cesar A. Mostacero Nov 05 '19 at 16:36
  • OP's output `id` column contains letters while yours contains integers – pissall Nov 05 '19 at 16:38
  • Yeah, the first version of the question he provided a DF with integers, when I added the answer, same example (with integers) was applied. But in the code I don't see any place where the `id` was overwritten. – Cesar A. Mostacero Nov 05 '19 at 16:44
  • @CesarA.Mostacero `orderBy(lit('A'))` is indeterminate. – pault Nov 05 '19 at 17:03
0

Thanks to Cesar's answer, I figured out how to do it without making the dataframe an RDD and coming back. It would be something like this:

from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import row_number, lit, udf
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, FloatType, IntegerType

spark = SparkSession.builder.getOrCreate()
sqlcontext = SQLContext(spark)
df = sqlcontext.createDataFrame([['a'],['b'],['c'],['d'],['e']], ['id'])

df = df.withColumn("row_num", row_number().over(Window().orderBy(lit('A'))))

new_col = [[1.,1.], [2.,2.], [3.,3.], [4.,4.], [5.,5.]]

map_list_to_column = udf(lambda row_num: new_col[row_num -1], ArrayType(FloatType()))

df.withColumn('new_col', map_list_to_column(df.row_num)).drop('row_num').show()
Tendero
  • 1,136
  • 2
  • 19
  • 34