I'm just starting to experiment with pyspark/spark and run into the issue that my code is not working. I cannot find the issue and the error output of spark is not very helpful. I do find sort of the same questions on stackoverflow but none with a clear answer or solution (at least not for me).
The code I'm trying to run is:
import json
from datetime import datetime, timedelta
from pyspark.sql.session import SparkSession
from parse.data_reader import read_csv
from parse.interpolate import insert_time_range, create_time_range, linear_interpolate
spark = SparkSession.builder.getOrCreate()
df = None
with open('config/data_sources.json') as sources_file:
sources = json.load(sources_file)
for file in sources['files']:
with open('config/mappings/{}.json'.format(file['mapping'])) as mapping:
df_to_append = read_csv(
spark=spark,
file='{}{}'.format(sources['root_path'], file['name']),
config=json.load(mapping)
)
if df is None:
df = df_to_append
else:
df = df.union(df_to_append)
df.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
time_range = create_time_range(
datetime(year=2019, month=7, day=1, hour=0),
datetime(year=2019, month=7, day=8, hour=0),
timedelta(seconds=3600)
)
df_with_intervals = insert_time_range(
df=df,
timestamp_column_name='Timestamp',
variable_column_name='Variable',
value_column_name='Value',
time_range=time_range,
)
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
Which gives the following output:
C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/04 13:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/04 13:31:36 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 4:=======================> (2 + 3) / 5]19/09/04 13:31:52 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://xxxxxx.azuredatabricks.net/?o=xxxxxx#/setting/clusters/xxxxxx/sparkUi
[Stage 5:===========> (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp |Variable |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0 |
|2019-07-01 00:00:06.664|Load % SB DG|0.0 |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows
Traceback (most recent call last):
File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 42, in <module>
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
print(self._jdf.showString(n, int(truncate), vertical))
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o655.showString.
: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
at com.trueaccord.scalapb.textformat.TextGenerator.addNewLine(TextGenerator.scala:33)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:38)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
Process finished with exit code 1
The two functions I'm using are:
def create_time_range(start_time: datetime, end_time: datetime, step_size: timedelta) -> Iterable[datetime]:
return [start_time + step_size * n for n in range(int((end_time - start_time) / step_size))]
def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
time_range: Iterable[datetime]) -> DataFrame:
time_range = array([lit(ts) for ts in time_range])
df_exploded = df \
.drop(value_column_name) \
.drop(timestamp_column_name) \
.distinct() \
.withColumn(value_column_name, lit(None)) \
.withColumn(timestamp_column_name, explode(time_range))
return df.union(df_exploded.select([timestamp_column_name, variable_column_name, value_column_name]))
The data_sources.json
file currently contains only one csv file which is a couple of MB. What causes the OutOfMemoryException or how can I get a more detailed error report?
As suggested by niuer I changed the function insert_time_range
to:
def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
time_range: Iterable[datetime]) -> DataFrame:
time_range = array([lit(ts) for ts in time_range])
df_exploded = df \
.drop(value_column_name) \
.drop(timestamp_column_name) \
.distinct() \
.withColumn(value_column_name, lit(None)) \
.withColumn(timestamp_column_name, lit(time_range[0]))
return df_exploded.select([timestamp_column_name, variable_column_name, value_column_name])
And before the .show()
call I added a line print(df_with_intervals.count())
which is outputting the number 5 (as expected). But still when I try to show()
the values I get the same OutOfMemoryException
.
UPDATE
I've narrowed down the issue to the union, but still unclear why it is not working. I've updated the insert_time_range
method according to suggestion in the comments:
def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
time_range: Iterable[datetime]) -> DataFrame:
schema = StructType(
[
StructField(timestamp_column_name, TimestampType(), True),
StructField(value_column_name, DoubleType(), True)
]
)
df_time_range = df.sql_ctx.createDataFrame(
[(timestamp, None) for timestamp in time_range],
schema=schema
)
df_time_range = df.select([variable_column_name]).distinct().crossJoin(df_time_range).select(
[timestamp_column_name, variable_column_name, value_column_name]
)
df_time_range.show(n=20, truncate=False)
return df.union(df_time_range)
which gives the following output:
C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/09 23:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/09 23:00:30 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 44:==================================> (3 + 2) / 5]19/09/09 23:00:43 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
[Stage 45:===========> (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp |Variable |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0 |
|2019-07-01 00:00:06.664|Load % SB DG|0.0 |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
+-------------------+------------+-----+
|Timestamp |Variable |Value|
+-------------------+------------+-----+
|2019-06-30 22:00:00|Load % PS DG|null |
|2019-06-30 22:00:00|Power PS DG |null |
|2019-06-30 22:00:00|Power Shore |null |
|2019-06-30 22:00:00|Load % SB DG|null |
|2019-06-30 22:00:00|Power SB DG |null |
|2019-06-30 22:01:00|Load % PS DG|null |
|2019-06-30 22:01:00|Power PS DG |null |
|2019-06-30 22:01:00|Power Shore |null |
|2019-06-30 22:01:00|Load % SB DG|null |
|2019-06-30 22:01:00|Power SB DG |null |
|2019-06-30 22:02:00|Load % PS DG|null |
|2019-06-30 22:02:00|Power PS DG |null |
|2019-06-30 22:02:00|Power Shore |null |
|2019-06-30 22:02:00|Load % SB DG|null |
|2019-06-30 22:02:00|Power SB DG |null |
|2019-06-30 22:03:00|Load % PS DG|null |
|2019-06-30 22:03:00|Power PS DG |null |
|2019-06-30 22:03:00|Power Shore |null |
|2019-06-30 22:03:00|Load % SB DG|null |
|2019-06-30 22:03:00|Power SB DG |null |
+-------------------+------------+-----+
only showing top 20 rows
Traceback (most recent call last):
File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 46, in <module>
df_with_intervals.sort([timestamp_column_name, variable_column_name]).show(n=5, truncate=False)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
print(self._jdf.showString(n, int(truncate), vertical))
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o333.showString.
: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
at com.trueaccord.scalapb.textformat.TextGenerator.add(TextGenerator.scala:19)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:33)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
Process finished with exit code 1
So the issue must be in the union
method but I have no clue what the issue is?
UPDATE In my first attempts I had only one CSV file in config/data_sources.json
so the df = df.union(df_to_append)
line was never executed. Now I've added multiple CSV files in config/data_sources.json
and then the union
method is execute and again I get the py4j.protocol.Py4JJavaError: An error occurred while calling o2043.showString.
: java.lang.OutOfMemoryError: Java heap space
error but it already happens with the first union
. What am I doing wrong with this method or this there a bug in the method itself?