I'm working in Spark 1.6.1 and Python 2.7 and I have this thing to solve:
- Get a dataframe A with X rows
- For each row in A, depending on a field, create one or more rows of a new dataframe B
- Save that new dataframe B
The solution that I've come up right now, is to collect dataframe A, go over it, append to a list the row(s) of B and then create the dataframe B from that list.
With this solution i obviously lose all the perks of working with dataframes and I would like to use foreach, but I can't find a way to make this work. I've tried this so far:
- Pass an empty list to the foreach function (this just ignores the foreach function and doesn't do anything)
- Create a global variable to be use in the foreach function (complains that it can't find the list)
Does anyone has any ideas?
Thank you
----------------------EDIT:
Examples of the things I've tried:
def f(row, list):
if row.one:
list += [Row(type='one', field='ok')]
else:
list += [Row(type='one', field='ok')]
list += [Row(type='two', field='nok')]
list = []
dfA.foreach(lambda x : f(x, list))
As I mention, this does nothing, it doesn't execute the function
And I've also tried (which list defined at the beginning of the class):
global list
def f(row):
if row.one:
list += [Row(type='one', field='ok')]
else:
list += [Row(type='one', field='ok')]
list += [Row(type='two', field='nok')]
dfA.foreach(list)
---------EDIT 2:
What I'm doing right now is:
list = []
for row in dfA.collect():
string = re.search(a_regex, row['raw'])
if string:
dates = re.findall(date_regex, string.group())
for date in dates:
date_string = datetime.strptime(date, '%Y-%m-%d').date()
list += [Row(event_type='1', event_date=date_string)]
b_string = re.search(b_regex, row['raw'])
if b_string:
dates = re.findall(date_regex, b_string.group())
for date in dates:
scheduled_to = datetime.strptime(date, '%Y-%m-%d').date()
list += [Row(event_type='2', event_date= date_string)]
and then:
dfB = self._sql_context.createDataFrame(list)
dfA is given by other process, I can't change it and i know it's a very stupid way of using dataframes but I can't do anything about that
--------------------EDIT3: dfA.raw sample:
{"new":[],"removed":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}]}
{"new":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}],"removed":[]}
{"new":[{"start":"2017-01-28","end":"2017-02-03"},{"start":"2017-02-04","end":"2017-02-10"},{"start":"2017-02-11","end":"2017-02-17"},{"start":"2017-02-18","end":"2017-02-24"},{"start":"2017-03-04","end":"2017-03-10"},{"start":"2017-03-11","end":"2017-03-17"},{"start":"2017-03-18","end":"2017-03-24"},{"start":"2017-09-02","end":"2017-09-08"},{"start":"2017-09-16","end":"2017-09-22"},{"start":"2017-09-23","end":"2017-09-29"},{"start":"2017-09-30","end":"2017-10-06"},{"start":"2017-10-07","end":"2017-10-13"},{"start":"2017-12-02","end":"2017-12-08"},{"start":"2017-12-09","end":"2017-12-15"},{"start":"2017-12-16","end":"2017-12-22"},{"start":"2017-12-23","end":"2017-12-29"},{"start":"2018-01-06","end":"2018-01-12"}],"removed":[{"start":"2017-02-04","end":"2017-02-10"},{"start":"2017-02-11","end":"2017-02-17"},{"start":"2017-02-18","end":"2017-02-24"},{"start":"2017-03-04","end":"2017-03-10"},{"start":"2017-03-11","end":"2017-03-17"},{"start":"2017-03-18","end":"2017-03-24"},{"start":"2017-01-28","end":"2017-02-03"},{"start":"2017-09-16","end":"2017-09-22"},{"start":"2017-09-02","end":"2017-09-08"},{"start":"2017-09-30","end":"2017-10-06"},{"start":"2017-10-07","end":"2017-10-13"},{"start":"2017-09-23","end":"2017-09-29"},{"start":"2017-12-16","end":"2017-12-22"},{"start":"2017-12-23","end":"2017-12-29"},{"start":"2018-01-06","end":"2018-01-12"},{"start":"2017-12-09","end":"2017-12-15"},{"start":"2017-12-02","end":"2017-12-08"},{"start":"2018-02-10","end":"2018-02-16"}]}|
and the regex:
a_regex = r'\"new\":{(.*?)}{2}|\"new\":\[(.*?)\]'
b_regex = r'\"removed\":{(.*?)}{2}|removed\":\[(.*?)\]'
date_regex = r'\"start\":\"(\d{4}-\d{2}-\d{2})\"'
dfA.select('raw').show(2,False)
+-------------------------------------------------------------------------------------------------------+
|raw |
+-------------------------------------------------------------------------------------------------------+
|{"new":[{"start":"2018-03-24","end":"2018-03-30","scheduled_by_system":null}],"removed":[]}|
|{"new":[{"start":"2018-03-10","end":"2018-03-16","scheduled_by_system":null}],"removed":[]}|
+-------------------------------------------------------------------------------------------------------+
only showing top 2 rows
df.select('raw').printSchema()
root
|-- raw: string (nullable = true)