0

I am processing parquet files in pyspark. My version info is:

pySpark version information

My data contains date and timestamp fields with values less than '1970-01-01'. I am getting the following error running locally on Mac OS Monterey v12.6.1.

22/11/27 20:22:46 ERROR Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 788, in toInternal
    return tuple(
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 789, in <genexpr>
    f.toInternal(v) if c else v
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 591, in toInternal
    return self.dataType.toInternal(obj)
  File "/Users/pm/opt/spark-3.3.0-bin-hadoop3/python/lib/pyspark.zip/pyspark/sql/types.py", line 216, in toInternal
    calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())
OverflowError: mktime argument out of range

How can I resolve this error?

Adding more context:

I am reading data from a file, processing it and writing it to Postgres table. Reading the parquet file, writing it to the table works fine. I also tried writing the dataframe as a parquet file as it. That also worked.

I was able to isolate the issue to the part where I capture records that failed table insert and write them to a reject file. This is done by calling a mapPartition on the dataframe from the parquet file. The core logic of the map partitions function is:

for row in rows:
    try:
        processed_row_count.add(1) # accumulator
        cur.execute(insert_statement, row)
        accepted_row_count.add(1) # accumulator
    except Exception as e:
        # Collect the records with error message here and write to reject file
        rejected_row_count.add(1) # accumulator
        row = row.asDict()
        row["ErrorMessage"] = f"Error received from psycopg2 module is: {str(e)}"
        yield Row(**row)
conn.close()

The mapPartitions is called as:

rejected_on_load_df = enriched_df.rdd.mapPartitions(process_rows).toDF(
        enriched_df.schema.add("ErrorMessage", StringType(), nullable=False)
    )

I get the error on writing the rejected_on_load_df

Prabodh Mhalgi
  • 805
  • 9
  • 25
  • Does this answer your question? [Python | mktime overflow error](https://stackoverflow.com/questions/2518706/python-mktime-overflow-error) – Kris Nov 28 '22 at 05:05
  • Unfortunately no. I do not have control over how pyspark is handling date and timestamp data. – Prabodh Mhalgi Nov 28 '22 at 16:03
  • Update: I was not able to resolve it on my local, so I ran it on my dev server. The dev server runs RHEL. It ran fine in dev. Dunno how to get it working on my macbook. – Prabodh Mhalgi Jan 10 '23 at 19:56

1 Answers1

0

I was curious and I did a little research.

The method mktime(), according to python's documentation, is platform dependent: more info here https://docs.python.org/3.8/library/time.html#time.mktime.

Then I had a look at Mac OS and, as it is based on an UNIX system, it runs with epoch Unix time.

From wiki: https://en.wikipedia.org/wiki/Unix_time

Unix time[a] is a date and time representation widely used in computing. It measures time by the number of seconds that have elapsed since 00:00:00 UTC on 1 January 1970, the beginning of the Unix epoch.[3]

Unix time originated as the system time of Unix operating systems. It has come to be widely used in other computer operating systems, file systems, programming languages, and databases.[5][6][7]

That's my best guess on why this is happening.

So my advice would be to find a way to represent dates in another format, which is not epoch.

Andrea S.
  • 71
  • 5
  • I updated the question and added a little more context. I printed all the rejected rows and this is how the data looks like: 'ga_dt': datetime.date(1900, 1, 1). I am only facing the issue for the rejected records. Note: the records are rejected because of a not null violation on an unrelated column, not because of dates. – Prabodh Mhalgi Nov 28 '22 at 19:55