2

I need to loop through pyspark dataframe and blast each row in number of active months. My major concern is memory management when I am trying to put the data in hive and time consumed in the process.

I have used collect with "idx", F.monotonically_increasing_id() but this has killed the performance of my code.

Input

Empid date_active date_end
1234  2012-01-01  2012-10-27
2345  2012-01-01  2012-12-31
3456  2012-01-01  2012-08-15

Output

EmpId effective_Month
1234  Jan-12
1234  Feb-12
1234  ....
1234  Oct-12
2345  Jan-12
2345  Feb-12
2345  ....
2345  Dec-12
ZygD
  • 22,092
  • 39
  • 79
  • 102
Ritesh
  • 23
  • 3
  • import pandas as pd from pyspark.sql.types import * hive_context = HiveContext(sc) for f in dfMT.collect(): d = [] MemberId = f.MemberId mth = f.months #c = int(mth, 0) sd = f.FromDate ed = f.ToDate lst = [dt.strptime('%2.2d-%2.2d' % (y, m), '%Y-%m').strftime('%b-%y') for y in range(sd.year, ed.year+1) for m in range(sd.month if y==sd.year else 1, ed.month+1 if y == ed.year else 13)] for n in range (mth): d.append({"MemberId" : MemberId, "MonthName" : lst[n]}) df = pd.DataFrame(d) – Ritesh Sep 03 '19 at 15:04
  • spark_df = hive_context.createDataFrame(df) spark_df.write.mode('append').format('hive').saveAsTable('test123') – Ritesh Sep 03 '19 at 15:04
  • 1
    Possible duplicate of [get all the dates between two dates in Spark DataFrame](https://stackoverflow.com/questions/51745007/get-all-the-dates-between-two-dates-in-spark-dataframe) – pault Sep 03 '19 at 20:20
  • Are `date_active` and `date_end` always guaranteed to be in the same year? – pault Sep 03 '19 at 20:30
  • 1
    Also possible dupe of [Generating monthly timestamps between two dates in pyspark dataframe](https://stackoverflow.com/questions/52412643/generating-monthly-timestamps-between-two-dates-in-pyspark-dataframe) – pault Sep 03 '19 at 20:35
  • Possible duplicate of [Generating monthly timestamps between two dates in pyspark dataframe](https://stackoverflow.com/questions/52412643/generating-monthly-timestamps-between-two-dates-in-pyspark-dataframe) – vikrant rana Sep 04 '19 at 04:27

2 Answers2

0

Usually, I would suggest to try using an udf. I had a similar problem once, but I considered the solution still too complicated.

Instead you could change your perspective on the problem. When you know your min(date_active) and max(date_end) of all EmpIds, you can iterate through all months e.g. 'yyyy-mm-01' and save it as a dataframe. Now you (broadcast-)join the resulting dataframe with every single row of your table. At last you only need a simple filter like: when effective_month between date_active and date_end. In the end you should transform effective_month into the string format you prefer.

Broadcast Joins are very fast, if the broadcasted table is small, such that runtime shouldn't be a problem here.

Christopher
  • 143
  • 6
0

You can solve this as below in data frame API

Creating a sample data frame

  df = spark.createDataFrame([["123","2012-01-01","2012-10-01"],['234', '2012-01-01', '2012-05-01'],["345","2012-01-01","2012-11-01"]], ("age","date_active", "date_end"))

    +---+-----------+----------+
    |age|date_active|  date_end|
    +---+-----------+----------+
    |123| 2012-01-01|2012-10-01|
    |234| 2012-01-01|2012-05-01|
    |345| 2012-01-01|2012-11-01|
    +---+-----------+----------+

Changing data type from string to timestamp

df = df.withColumn('date_active', df['date_active'].cast('timestamp'))\
.withColumn('date_end', df['date_end'].cast('timestamp'))

Using below code adding the month column

from pyspark.sql import functions as f

df.withColumn('month_diff', f.months_between('date_end', 'date_active')).withColumn("repeat", f.expr("split(repeat(',', month_diff), ',')"))\
.select("*", f.posexplode("repeat").alias("date", "val")).withColumn("date", f.expr("add_months(date_active, date)"))\
.withColumn('month', f.date_format('date','MMM')).select(['age', 'date_active', 'date_end', 'month']).show()


---+-----------+----------+-----+
|age|date_active|  date_end|month|
+---+-----------+----------+-----+
|123| 2012-01-01|2012-10-01|  Jan|
|123| 2012-01-01|2012-10-01|  Feb|
|123| 2012-01-01|2012-10-01|  Mar|
|123| 2012-01-01|2012-10-01|  Apr|
|123| 2012-01-01|2012-10-01|  May|
|123| 2012-01-01|2012-10-01|  Jun|
|123| 2012-01-01|2012-10-01|  Jul|
|123| 2012-01-01|2012-10-01|  Aug|
|123| 2012-01-01|2012-10-01|  Sep|
|123| 2012-01-01|2012-10-01|  Oct|
|234| 2012-01-01|2012-05-01|  Jan|
|234| 2012-01-01|2012-05-01|  Feb|
|234| 2012-01-01|2012-05-01|  Mar|
|234| 2012-01-01|2012-05-01|  Apr|
|234| 2012-01-01|2012-05-01|  May|
|345| 2012-01-01|2012-11-01|  Jan|
|345| 2012-01-01|2012-11-01|  Feb|
|345| 2012-01-01|2012-11-01|  Mar|
|345| 2012-01-01|2012-11-01|  Apr|
|345| 2012-01-01|2012-11-01|  May|
+---+-----------+----------+-----+
ravi malhotra
  • 703
  • 5
  • 14
  • replace withColumn('month', f.date_format('date','MMM')) with withColumn('month', f.date_format('date','MMM-YY')). – vikrant rana Sep 05 '19 at 11:42
  • Thanks Ravi on first hand code ran very well and did the trick. Thanks Vikrant for the suggestion. – Ritesh Sep 05 '19 at 12:11