1

I have a table which has multiple id's, each having different start dates. The end date will remain the same for all and will dynamically change being the last day of the previous month from today's date. I am trying to iterate over each id and respective start date to get a list of months from the start date to the end date mapped back to the id's.

My current table looks like below :

ID      Start_Date      End_Date 
A       2019-12-15      2020-04-30
B       2020-03-03      2020-04-30

My desired output table :

ID      Start_Date      End_Date       ID_period
A       2019-12-15      2020-04-30     201912
A       2019-12-15      2020-04-30     202001
A       2019-12-15      2020-04-30     202002
A       2019-12-15      2020-04-30     202003
A       2019-12-15      2020-04-30     202004
B       2020-03-03      2020-04-30     202003
B       2020-03-03      2020-04-30     202004

I have tried the below code with some changes sourced from Generate list of months between interval in python

from datetime import datetime, timedelta
from collections import OrderedDict

dates = ["2014-10-10","2016-01-01"]

def monthlist_fast(dates):
    for val in enumerate(dates):
        start = val
        end = dt.date.today().replace(day=1) - timedelta(days=1)
        start, end = [datetime.strptime(_, "%Y-%m-%d") for _ in dates]
        total_months = lambda dt: dt.month + 12 * dt.year
        mlist = []
        for tot_m in range(total_months(start)-1, total_months(end)):
            y, m = divmod(tot_m, 12)
            mlist.append(datetime(y, m+1, 1).strftime("%Y%m"))
        return mlist

My result:

['201410',
 '201411',
 '201412',
 '201501',
 '201502',
 '201503',
 '201504',
 '201505',
 '201506',
 '201507',
 '201508',
 '201509',
 '201510',
 '201511',
 '201512',
 '201601']

But I am unable to figure out a way to map these back to my Ids especially since my start_dates keep on changing with different Ids. Any help would be appreciated. Thanks.

vagautam
  • 81
  • 11

4 Answers4

1

Assuming you are reading your data from a file (which i have called input.txt) You could try something like

from datetime import datetime

def read_log_file():
    data = []

    with open("input.txt", "r") as input_data:
        input_data.readline()
        for line in input_data:
            data.append(line.strip().split())

    with open("output.txt", "w") as output:
        print("ID\tStart_Date\tEnd_Date\tID_period", file=output)
        for entry_id, start_date, end_date in data:
            end_datetime = datetime.strptime(end_date, "%Y-%m-%d")
            id_period = datetime.strptime(start_date, "%Y-%m-%d")
            while id_period.year < end_datetime.year or id_period.month <= end_datetime.month:
                print("\t".join([entry_id, start_date, end_date, id_period.strftime("%Y%m")]), file=output)
                next_year = id_period.year
                next_month = id_period.month + 1
                if next_month > 12:
                    next_month = 1
                    next_year += 1
                id_period = datetime(year=next_year, month=next_month, day=id_period.day)

which produces

ID  Start_Date  End_Date    ID_period
A   2019-12-15  2020-04-30  201912
A   2019-12-15  2020-04-30  202001
A   2019-12-15  2020-04-30  202002
A   2019-12-15  2020-04-30  202003
A   2019-12-15  2020-04-30  202004
B   2020-03-03  2020-04-30  202003
B   2020-03-03  2020-04-30  202004

  • thank you for this. My apologies if my question wasn't clear before, how do I make this work when I have the data as pyspark table? – vagautam May 22 '20 at 18:32
  • while I'm not familiar with pyspark you should just be able to place the reading and writing parts of the script with the relevant functions for pyspark. – MindOfMetalAndWheels May 22 '20 at 18:37
  • I would like to upvote your answer as it is my handicap that I couldn't transform it into pyspark and my OP was not clear on the required language. However, the site allows only 1 upvote for a post and hence, I am tagging the closest meeting my requirements. I would like to reiterate that your answer also meets my requirement in a way and is no way incorrect.Thanks again for your time. – vagautam May 22 '20 at 19:37
1

For future, would recommend to update to spark 2.4+ as sequence function is a game changer.

For Spark2.1+ :

modifed from this answer: Generating monthly timestamps between two dates in pyspark dataframe

from pyspark.sql import functions as F
df.withColumn("monthsDiff", F.months_between("End_Date", "Start_Date"))\
    .withColumn("repeat", F.expr("split(repeat(',', monthsDiff), ',')"))\
     .select("*", F.posexplode("repeat").alias("date", "val"))\
    .withColumn("Id_period", F.expr("""date_format(add_months(Start_Date, date),'yyyyMM')"""))\
    .drop("repeat","val","monthsDiff","date").show()

#+---+----------+----------+---------+
#| ID|Start_Date|  End_Date|Id_period|
#+---+----------+----------+---------+
#|  A|2019-12-15|2020-04-30|   201912|
#|  A|2019-12-15|2020-04-30|   202001|
#|  A|2019-12-15|2020-04-30|   202002|
#|  A|2019-12-15|2020-04-30|   202003|
#|  A|2019-12-15|2020-04-30|   202004|
#|  B|2020-03-03|2020-04-30|   202003|
#|  B|2020-03-03|2020-04-30|   202004|
#+---+----------+----------+---------+

For Spark2.4+:

from pyspark.sql import functions as F

df.withColumn("Id_period", F.explode(F.expr("""transform(sequence(to_date(start_date),to_date(end_date)\
                                                         ,interval 1 month),x-> date_format(x,'yyyyMM'))"""))).show()

#+---+----------+----------+---------+
#| ID|Start_Date|  End_Date|Id_period|
#+---+----------+----------+---------+
#|  A|2019-12-15|2020-04-30|   201912|
#|  A|2019-12-15|2020-04-30|   202001|
#|  A|2019-12-15|2020-04-30|   202002|
#|  A|2019-12-15|2020-04-30|   202003|
#|  A|2019-12-15|2020-04-30|   202004|
#|  B|2020-03-03|2020-04-30|   202003|
#|  B|2020-03-03|2020-04-30|   202004|
#+---+----------+----------+---------+
murtihash
  • 8,030
  • 1
  • 14
  • 26
  • 1
    I agree and can see the difference between the versions, unfortunately I am dependent on the enterprise version. But surely sequence function seems awsome. Thanks for the answer. – vagautam May 22 '20 at 19:33
1

If you are trying to implement using pyspark then you can use in-built functions which will give better performance as well.

sequence with interval 1 month will expand Start_date and End_date with one month gap and expr help you to run sql functions

import pyspark.sql.functions as f

df1 = df.withColumn('months', f.expr('sequence(to_date(Start_Date), to_date(End_Date), interval 1 month)'))\
    .withColumn('month', f.explode('months'))\
    .withColumn('ID_period', f.date_format('month', 'yyyyMM')).drop('months', 'month')

df1.show()

+---+----------+----------+---------+
| ID|Start_Date|  End_Date|ID_period|
+---+----------+----------+---------+
|  A|2019-12-15|2020-04-30|   201912|
|  A|2019-12-15|2020-04-30|   202001|
|  A|2019-12-15|2020-04-30|   202002|
|  A|2019-12-15|2020-04-30|   202003|
|  A|2019-12-15|2020-04-30|   202004|
|  B|2020-03-03|2020-04-30|   202003|
|  B|2020-03-03|2020-04-30|   202004|
+---+----------+----------+---------+
SMaZ
  • 2,515
  • 1
  • 12
  • 26
0

Let me know how it woks.

import pyspark.sql.functions as f
data = spark.createDataFrame([('A', '2019-12-15', '2020-04-30'), ('B', '2020-03-03', '2020-04-30'), ('C', '2020-04-29', '2020-04-30')], ['ID', 'Start_Date', 'End_Date'])
data = (data
        .withColumn('dateDifferenceArray', f.sequence(f.lit(0), f.datediff(f.col('End_Date'), f.col('Start_Date'))))
        .withColumn('ID_period', f.explode(f.array_distinct(f.expr('transform(dateDifferenceArray, element -> date_format(date_add(Start_Date, element), "yyyyMM"))'))))
        .drop('dateDifferenceArray')
       )
data.show()
ARCrow
  • 1,360
  • 1
  • 10
  • 26
  • Sorry, it didn't work. I got the below error ```module 'pyspark.sql.functions' has no attribute 'sequence' Traceback (most recent call last): AttributeError: module 'pyspark.sql.functions' has no attribute 'sequence'``` – vagautam May 22 '20 at 18:40
  • Which version of pyspark are you using? it's available from version 2.4. If you're not using versions 2.4 and above, the transform logic in the next line will throw an error as well... http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.sequence – ARCrow May 23 '20 at 21:15
  • my version is 2.2 – vagautam May 24 '20 at 22:38