6

I'm just starting to experiment with pyspark/spark and run into the issue that my code is not working. I cannot find the issue and the error output of spark is not very helpful. I do find sort of the same questions on stackoverflow but none with a clear answer or solution (at least not for me).

The code I'm trying to run is:

import json
from datetime import datetime, timedelta

from pyspark.sql.session import SparkSession

from parse.data_reader import read_csv
from parse.interpolate import insert_time_range, create_time_range, linear_interpolate

spark = SparkSession.builder.getOrCreate()

df = None
with open('config/data_sources.json') as sources_file:
    sources = json.load(sources_file)
    for file in sources['files']:
        with open('config/mappings/{}.json'.format(file['mapping'])) as mapping:
            df_to_append = read_csv(
                spark=spark,
                file='{}{}'.format(sources['root_path'], file['name']),
                config=json.load(mapping)
            )

        if df is None:
            df = df_to_append
        else:
            df = df.union(df_to_append)

df.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)

time_range = create_time_range(
    datetime(year=2019, month=7, day=1, hour=0),
    datetime(year=2019, month=7, day=8, hour=0),
    timedelta(seconds=3600)
)

df_with_intervals = insert_time_range(
    df=df,
    timestamp_column_name='Timestamp',
    variable_column_name='Variable',
    value_column_name='Value',
    time_range=time_range,
)
df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)

Which gives the following output:

C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/04 13:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/04 13:31:36 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 4:=======================>                                   (2 + 3) / 5]19/09/04 13:31:52 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://xxxxxx.azuredatabricks.net/?o=xxxxxx#/setting/clusters/xxxxxx/sparkUi
[Stage 5:===========>                                               (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 42, in <module>
    df_with_intervals.sort(["Timestamp", "Variable"]).show(n=5, truncate=False)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o655.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.addNewLine(TextGenerator.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:38)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1

The two functions I'm using are:

def create_time_range(start_time: datetime, end_time: datetime, step_size: timedelta) -> Iterable[datetime]:
    return [start_time + step_size * n for n in range(int((end_time - start_time) / step_size))]


def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    time_range = array([lit(ts) for ts in time_range])
    df_exploded = df \
        .drop(value_column_name) \
        .drop(timestamp_column_name) \
        .distinct() \
        .withColumn(value_column_name, lit(None)) \
        .withColumn(timestamp_column_name, explode(time_range))
    return df.union(df_exploded.select([timestamp_column_name, variable_column_name, value_column_name]))

The data_sources.json file currently contains only one csv file which is a couple of MB. What causes the OutOfMemoryException or how can I get a more detailed error report?

As suggested by niuer I changed the function insert_time_range to:

def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    time_range = array([lit(ts) for ts in time_range])
    df_exploded = df \
        .drop(value_column_name) \
        .drop(timestamp_column_name) \
        .distinct() \
        .withColumn(value_column_name, lit(None)) \
        .withColumn(timestamp_column_name, lit(time_range[0]))
    return df_exploded.select([timestamp_column_name, variable_column_name, value_column_name])

And before the .show() call I added a line print(df_with_intervals.count()) which is outputting the number 5 (as expected). But still when I try to show() the values I get the same OutOfMemoryException.

UPDATE I've narrowed down the issue to the union, but still unclear why it is not working. I've updated the insert_time_range method according to suggestion in the comments:

def insert_time_range(df: DataFrame, timestamp_column_name: str, variable_column_name: str, value_column_name: str,
                      time_range: Iterable[datetime]) -> DataFrame:
    schema = StructType(
        [
            StructField(timestamp_column_name, TimestampType(), True),
            StructField(value_column_name, DoubleType(), True)
        ]
    )
    df_time_range = df.sql_ctx.createDataFrame(
        [(timestamp, None) for timestamp in time_range],
        schema=schema
    )
    df_time_range = df.select([variable_column_name]).distinct().crossJoin(df_time_range).select(
        [timestamp_column_name, variable_column_name, value_column_name]
    )
    df_time_range.show(n=20, truncate=False)

    return df.union(df_time_range)

which gives the following output:

C:\Users\mmun01\PycharmProjects\xxxx\venv\Scripts\python.exe C:/Users/mmun01/PycharmProjects/xxxx/application.py
19/09/09 23:00:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/09 23:00:30 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
[Stage 44:==================================>                       (3 + 2) / 5]19/09/09 23:00:43 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
[Stage 45:===========>                                              (1 + 4) / 5]+-----------------------+------------+-----+
|Timestamp              |Variable    |Value|
+-----------------------+------------+-----+
|2019-07-01 00:00:06.664|Load % PS DG|0.0  |
|2019-07-01 00:00:06.664|Load % SB DG|0.0  |
|2019-07-01 00:00:06.664|Power PS DG |null |
|2019-07-01 00:00:06.664|Power SB DG |null |
|2019-07-01 00:00:06.664|Power Shore |null |
+-----------------------+------------+-----+
only showing top 5 rows

View job details at https://westeurope.azuredatabricks.net/?o=2202252276771286#/setting/clusters/0903-124716-art213/sparkUi
+-------------------+------------+-----+
|Timestamp          |Variable    |Value|
+-------------------+------------+-----+
|2019-06-30 22:00:00|Load % PS DG|null |
|2019-06-30 22:00:00|Power PS DG |null |
|2019-06-30 22:00:00|Power Shore |null |
|2019-06-30 22:00:00|Load % SB DG|null |
|2019-06-30 22:00:00|Power SB DG |null |
|2019-06-30 22:01:00|Load % PS DG|null |
|2019-06-30 22:01:00|Power PS DG |null |
|2019-06-30 22:01:00|Power Shore |null |
|2019-06-30 22:01:00|Load % SB DG|null |
|2019-06-30 22:01:00|Power SB DG |null |
|2019-06-30 22:02:00|Load % PS DG|null |
|2019-06-30 22:02:00|Power PS DG |null |
|2019-06-30 22:02:00|Power Shore |null |
|2019-06-30 22:02:00|Load % SB DG|null |
|2019-06-30 22:02:00|Power SB DG |null |
|2019-06-30 22:03:00|Load % PS DG|null |
|2019-06-30 22:03:00|Power PS DG |null |
|2019-06-30 22:03:00|Power Shore |null |
|2019-06-30 22:03:00|Load % SB DG|null |
|2019-06-30 22:03:00|Power SB DG |null |
+-------------------+------------+-----+
only showing top 20 rows

Traceback (most recent call last):
  File "C:/Users/mmun01/PycharmProjects/xxxx/application.py", line 46, in <module>
    df_with_intervals.sort([timestamp_column_name, variable_column_name]).show(n=5, truncate=False)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\dataframe.py", line 381, in show
    print(self._jdf.showString(n, int(truncate), vertical))
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\mmun01\PycharmProjects\xxxx\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o333.showString.
: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
    at com.trueaccord.scalapb.textformat.TextGenerator.maybeNewLine(TextGenerator.scala:13)
    at com.trueaccord.scalapb.textformat.TextGenerator.add(TextGenerator.scala:19)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:33)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)
    at com.trueaccord.scalapb.textformat.Printer$.printField(Printer.scala:28)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:13)
    at com.trueaccord.scalapb.textformat.Printer$$anonfun$print$2.apply(Printer.scala:12)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at com.trueaccord.scalapb.textformat.Printer$.print(Printer.scala:12)
    at com.trueaccord.scalapb.textformat.Printer$.printFieldValue(Printer.scala:70)
    at com.trueaccord.scalapb.textformat.Printer$.printSingleField(Printer.scala:37)


Process finished with exit code 1

So the issue must be in the union method but I have no clue what the issue is?

UPDATE In my first attempts I had only one CSV file in config/data_sources.json so the df = df.union(df_to_append) line was never executed. Now I've added multiple CSV files in config/data_sources.json and then the union method is execute and again I get the py4j.protocol.Py4JJavaError: An error occurred while calling o2043.showString. : java.lang.OutOfMemoryError: Java heap space error but it already happens with the first union. What am I doing wrong with this method or this there a bug in the method itself?

Martijn de Munnik
  • 924
  • 12
  • 23
  • Try creating a single-column dataframe using `time_range` and then do a `crossJoin`. Pyspark may automatically repartition and optimise the "exploding" for you. – absolutelydevastated Sep 09 '19 at 16:45
  • Thanks for your comment, see my update above for the results. – Martijn de Munnik Sep 09 '19 at 21:07
  • Why is there a need to do a `union` after the `crossJoin`? You can just return `df_time_range`. You might also want to try writing `df` to an intermediate location. – absolutelydevastated Sep 10 '19 at 02:22
  • The reason for the `crossJoin` is that the `df_time_range` doesn't contain the original samples. The step after this is to interpolate the values for the timestamps in `df_time_range`. – Martijn de Munnik Sep 10 '19 at 07:28
  • I'm talking about the final `union` step. That's not necessary. You just need to return the last `df_time_range`. – absolutelydevastated Sep 10 '19 at 07:44
  • Sorry, my mistake in my last comment, the `union` (instead of the `crossJoin`) is necessary to make sure that the original samples are still in the returned dataset. And why should a join of two datasets give an OutOfMemory exception in the first place? The datasets are in the order of megabytes, so not really big. – Martijn de Munnik Sep 10 '19 at 11:08
  • What is the size of your java heap space? Without doubt the above code can be improved but also you could do some java & spark settings to limit the memory use: `java -Xms -Xmx` and in `$SPARK_HOME/conf/spark-defaults.conf` set the max size of the driver memory. Eg: `spark.driver.memory 10g`. It will take longer to process though. – geo Sep 11 '19 at 02:56
  • I'm sure the code above needs optimization, these are my first experiments with (py)spark ;-) I use an Azure Databricks instance with the following settings ` Standard_DS5_v2 56.0 GB Memory, 16 Cores, 3 DBU` for both driver and workers. I have no idea if I can make the above specific settings in Azure? Also the dataset I'm using is only about ~30MB and I can easily achieve the same steps with `pandas` on my local laptop. I think there is a bug in my code which causes the excessive memory usage, but I cannot find that bug. – Martijn de Munnik Sep 11 '19 at 10:02
  • Imo, if it is true that the `union` is causing the error, you can simply do a `crossJoin` without dropping the other rows and populate the `Value` column by taking `Value` only if the extrapolated timestamp matches the one in the original dataframe and `null` otherwise. In that case you don't need to `union` and it helps to isolate the problem in your code. – absolutelydevastated Sep 11 '19 at 13:50
  • Well the `union` is causing an issue but I use it in several locations. Also see my last update to the original post. – Martijn de Munnik Sep 11 '19 at 15:17
  • Can you specify schema when you readcsv? otherwise it's inferred. – niuer Sep 12 '19 at 21:21
  • which java version are you using – Yumlembam Rahul Sep 16 '19 at 13:45

3 Answers3

0

It might come from the explode you are doing. You basically do cross join of all rows generated from json file with the datetimes in time_range, which has 168 elements. I would replace the explode with F.lit() first to see if it runs. If there's still problem, I'll remove union code to try.

niuer
  • 1,589
  • 2
  • 11
  • 14
  • The goal of the `insert_time_range` function is to insert rows at fixed time intervals, the next step will be to interpolate the values. When interpolation is done the values on the non fixed time intervals are thrown away. – Martijn de Munnik Sep 05 '19 at 09:06
  • The `df` which is read from the source file has 346265 rows. Each rows consist of a timestamp (~13 bytes) a variable (~50 bytes) and a value (~10 bytes). So one row should be in the order of 100 bytes. The total size is about 33MB. Then when I try add a row for each variable (5 variables total) for each hour (during one month) I'm adding 3720 rows which should not make a huge difference in my opinion. – Martijn de Munnik Sep 05 '19 at 09:19
  • Also see the additions to the original question as you suggested. Alas without results. – Martijn de Munnik Sep 05 '19 at 09:26
  • What's your spark cluster setup? Try increase the memory in driver/worker. – niuer Sep 05 '19 at 15:46
  • I'm using Azure Databricks, both driver and workers are `Standard_DS5_v2 56.0 GB Memory, 16 Cores, 3 DBU`. That should be more than sufficient, the dataset is really not that big. I can easily handle it with pandas on my local not so fancy laptop. ' – Martijn de Munnik Sep 05 '19 at 19:47
  • I am not quite sure then, how about increase partitions? hope this link can help: https://stackoverflow.com/questions/21138751/spark-java-lang-outofmemoryerror-java-heap-space/35961952 – niuer Sep 05 '19 at 20:34
  • Partition the data doesn't same to make any difference. I tried `df_with_intervals = df_with_intervals.repartition(5)` with several partition sizes, 1, 5, 2000, 200000. – Martijn de Munnik Sep 09 '19 at 13:51
  • Have you tried to remove `sort(["Timestamp", "Variable"])` and just call `show()` on the simplified DataFrame? – niuer Sep 09 '19 at 20:10
  • Yes I have removed all the calls to `sort()` so it just leaves `df.show(n=5, truncate=False)`. – Martijn de Munnik Sep 09 '19 at 21:37
0

I went through the communication you have been having with niuer.

Are you sure, you are working with seconds=3600. I ask this because DF just before error indicates 1 minute interval in your update. With seconds=60 and total range = 1 month, you will have 44640 new rows per original row. That is quite some explosion in data.

Also, add repartition after distinct

df_exploded = df \
        .drop(value_column_name) \
        .drop(timestamp_column_name) \
        .distinct() \
        .repartition(2000) \ # some sane value please
        .withColumn(value_column_name, lit(None)) \
        .withColumn(timestamp_column_name, lit(time_range[0]))
D3V
  • 1,543
  • 11
  • 21
  • Indeed I tried several options for the interval just to make sure that the data is not getting too big. There are only 5 variables in the source CSV files so the data cannot be that big. – Martijn de Munnik Sep 16 '19 at 14:02
  • Try adding repartition after distinct. – D3V Sep 16 '19 at 17:03
0

It seems to me that you are reading everything into the memory of a single machine (most likely the master running the driver program) by reading in this loop (latency issues could also arise if you are not reading in NFS). You should try something like:

sparkcontext.wholeTextFiles("path/*")