12

I want to convert dataframe from pandas to spark and I am using spark_context.createDataFrame() method to create the dataframe. I'm also specifying the schema in the createDataFrame() method.

What I want to know is how handle special cases. For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN". I am looking for ways how to get actual nulls instead of "NaN".

zero323
  • 322,348
  • 103
  • 959
  • 935
Prashil Sureja
  • 119
  • 1
  • 1
  • 8

2 Answers2

16

TL;DR Your best option for now is to skip Pandas completely.

The source of the problem is that Pandas are less expressive than Spark SQL. Spark provides both NULL (in a SQL sense, as missing value) and NaN (numeric Not a Number).

Pandas from the other handm doesn't have native value which can be used to represent missing values. As a result it uses placeholders like NaN / NaT or Inf, which are indistinguishable to Spark from actual NaNs and Infs and conversion rules depend on the column type. The only exception are object columns (typically strings) which can contain None values. You can learn more about handling missing values Pandas from the documentation.

For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN".

This is actually not correct. Depending on type of input column. If column shows NaN it is most likely not a number value, not a plain string:

from pyspark.sql.functions import isnan, isnull

pdf = pd.DataFrame({
    "x": [1, None], "y": [None, "foo"], 
    "z": [pd.Timestamp("20120101"), pd.Timestamp("NaT")]
})
sdf = spark.createDataFrame(pdf)

sdf.show()
+---+----+-------------------+
|  x|   y|                  z|
+---+----+-------------------+
|1.0|null|2012-01-01 00:00:00|
|NaN| foo|               null|
+---+----+-------------------+
sdf.select([
    f(c) for c in sdf.columns for f in [isnan, isnull] 
    if (f, c) != (isnan, "z")  # isnan cannot be applied to timestamp 
]).show()
+--------+-----------+--------+-----------+-----------+
|isnan(x)|(x IS NULL)|isnan(y)|(y IS NULL)|(z IS NULL)|
+--------+-----------+--------+-----------+-----------+
|   false|      false|   false|       true|      false|
|    true|      false|   false|      false|       true|
+--------+-----------+--------+-----------+-----------+

In practice, parallelized local collections (including Pandas objects) have negligible importance beyond simple testing and toy examples so you can always convert data manually (skipping possible Arrow optimizations):

import numpy as np

spark.createDataFrame([
   tuple(
        None if isinstance(x, (float, int)) and np.isnan(x) else x
        for x in record.tolist())
   for record in pdf.to_records(index=False)
], pdf.columns.tolist()).show()
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|1325376000000000000|
|null| foo|               null|
+----+----+-------------------+

If missing / not-a-number ambiguity is not an issue then just load data as usually and replace in Spark.

from pyspark.sql.functions import col, when 

sdf.select([
    when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c 
    for c, t in sdf.dtypes
]).show()
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|2012-01-01 00:00:00|
|null| foo|               null|
+----+----+-------------------+
zero323
  • 322,348
  • 103
  • 959
  • 935
  • This also helps if you have 'NaT' in a datetime64[ns] field and want to transfer it to spark (as in my case). Very helpful answer. Thank you. – Borislav Aymaliev Oct 29 '18 at 10:14
  • Thank you for a comprehensive explanation. Different treatment of (pandas's) None for numeric and string columns in Spark was surprising to me. – Vojta F Nov 04 '21 at 14:58
7

If you want to load a pandas df you can replace NaN with None:

import pandas as pd
def load_csv(spark, path):
    """read csv to spark df"""
    pd_df = pd.read_csv(path)
    pd_df = pd_df.where((pd.notnull(pd_df)), None)
    df = spark.createDataFrame(pd_df)
    return df
justin cress
  • 1,745
  • 5
  • 24
  • 35
  • simple and efficient! – Marco Antonio Yamada Aug 11 '20 at 11:46
  • 1
    Can simplify the `where` line: `pd_df.where(cond=df.notna(), other=None)`. – Daniel Himmelstein Oct 02 '20 at 21:08
  • 1
    Nice solution! This should really be internal to `spark.createDataFrame`. There does seem to be [trouble](https://issues.apache.org/jira/browse/SPARK-30966) with this solution when the input pd_df has had `.convert_dtypes()` called on it. Also seems not always to work when `spark.sql.execution.arrow.pyspark.enabled` is `true`. – Daniel Himmelstein Oct 02 '20 at 21:11