-4

I've got a data set that has companies with a daily production rate of a product. I want to add a column to that dataframe which will sequentially number based on that company by date.

Ex.
Acme Product1 1/1/2000 5
Acme Product1 1/2/2000 7
Acme Product2 3/1/2000 9
Acme Product2 3/2/2000 4
Company2 ProductX 4/1/2015 6
Company2 ProductX 4/2/2015 3

I want to add a new column like:
Acme Product1 1/1/2000 5 1
Acme Product1 1/2/2000 7 2
Acme Product2 3/1/2000 9 1
Acme Product2 3/2/2000 4 2
Company2 ProductX 4/1/2015 6 1
Company2 ProductX 4/2/2015 3 2
Company2 ProductX 4/2/2015 2 3

This is all so that I can compare companies and their products based on the new column. So all of their day one production for a product regardless of date.

Jim
  • 121
  • 1
  • 8
  • 1
    You should attempt to do it and ask when you are stuck or your solution doesn't work. – Isma Sep 08 '17 at 21:15

1 Answers1

0

You can use windowing with pyspark.sql function row_number (alias rowNumber works for spark <= 1.6.X).

First let's create the dataframe:

myDF = spark.createDataFrame(
    sc.parallelize([["Acme", "Product1", "1/1/2000", 5],
        ["Acme", "Product1", "1/2/2000", 7],
        ["Acme", "Product2", "3/1/2000", 9],
        ["Acme", "Product2", "3/2/2000", 4],
        ["Company2", "ProductX", "4/1/2015", 6],
        ["Company2", "ProductX", "4/2/2015", 3], 
        ["Company2", "ProductX", "4/2/2015", 2]]), 
    ["company", "product", "date", "nb"])

    +--------+--------+--------+---+
    | company| product|    date| nb|
    +--------+--------+--------+---+
    |    Acme|Product1|1/1/2000|  5|
    |    Acme|Product1|1/2/2000|  7|
    |    Acme|Product2|3/1/2000|  9|
    |    Acme|Product2|3/2/2000|  4|
    |Company2|ProductX|4/1/2015|  6|
    |Company2|ProductX|4/2/2015|  3|
    |Company2|ProductX|4/2/2015|  2|
    +--------+--------+--------+---+

Now to use the window function:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

resultDF = myDF.withColumn(
    "rowNum", 
    F.row_number().over(Window.partitionBy("company", "product")
                           .orderBy("date")))

    +--------+--------+--------+---+------+
    | company| product|    date| nb|rowNum|
    +--------+--------+--------+---+------+
    |Company2|ProductX|4/1/2015|  6|     1|
    |Company2|ProductX|4/2/2015|  3|     2|
    |Company2|ProductX|4/2/2015|  2|     3|
    |    Acme|Product2|3/1/2000|  9|     1|
    |    Acme|Product2|3/2/2000|  4|     2|
    |    Acme|Product1|1/1/2000|  5|     1|
    |    Acme|Product1|1/2/2000|  7|     2|
    +--------+--------+--------+---+------+
MaFF
  • 9,551
  • 2
  • 32
  • 41
jeanr
  • 1,031
  • 8
  • 15
  • This is the correct answer, it should not have been down voted – MaFF Sep 09 '17 at 08:49
  • This worked...Thanks! I'm just learning, so I wasn't sure where to start and everything I tried just gave errors. A few notes, at first I got "'F' was not defined", so after research I had to add "from pyspark.sql import functions as F" and "from pyspark.sql.window import Window". Then I got something about rowNumber module was not found. So again after research, I had to change to F.row_number(). – Jim Sep 09 '17 at 13:25