46

I'm manually creating a dataframe for some testing. The code to create it is:

case class input(id:Long, var1:Int, var2:Int, var3:Double)
val inputDF = sqlCtx
  .createDataFrame(List(input(1110,0,1001,-10.00),
    input(1111,1,1001,10.00),
    input(1111,0,1002,10.00)))

So the schema looks like this:

root
 |-- id: long (nullable = false)
 |-- var1: integer (nullable = false)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

I want to make 'nullable = true' for each one of these variable. How do I declare that from the start or switch it in a new dataframe after it's been created?

J Calbreath
  • 2,665
  • 4
  • 22
  • 31

8 Answers8

53

Answer

With the imports

import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

you can use

/**
 * Set nullable property of column.
 * @param df source DataFrame
 * @param cn is the column name to change
 * @param nullable is the flag to set, such that the column is  either nullable or not
 */
def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = {

  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m)
    case y: StructField => y
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

directly.

Also you can make the method available via the "pimp my library" library pattern ( see my SO post What is the best way to define custom methods on a DataFrame? ), such that you can call

val df = ....
val df2 = df.setNullableStateOfColumn( "id", true )

Edit

Alternative solution 1

Use a slight modified version of setNullableStateOfColumn

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  // get schema
  val schema = df.schema
  // modify [[StructField] with name `cn`
  val newSchema = StructType(schema.map {
    case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m)
  })
  // apply new schema
  df.sqlContext.createDataFrame( df.rdd, newSchema )
}

Alternative solution 2

Explicitely define the schema. (Use reflection to create a solution that is more general)

configuredUnitTest("Stackoverflow.") { sparkContext =>

  case class Input(id:Long, var1:Int, var2:Int, var3:Double)

  val sqlContext = new SQLContext(sparkContext)
  import sqlContext.implicits._


  // use this to set the schema explicitly or
  // use refelection on the case class member to construct the schema
  val schema = StructType( Seq (
    StructField( "id", LongType, true),
    StructField( "var1", IntegerType, true),
    StructField( "var2", IntegerType, true),
    StructField( "var3", DoubleType, true)
  ))

  val is: List[Input] = List(
    Input(1110, 0, 1001,-10.00),
    Input(1111, 1, 1001, 10.00),
    Input(1111, 0, 1002, 10.00)
  )

  val rdd: RDD[Input] =  sparkContext.parallelize( is )
  val rowRDD: RDD[Row] = rdd.map( (i: Input) ⇒ Row(i.id, i.var1, i.var2, i.var3))
  val inputDF = sqlContext.createDataFrame( rowRDD, schema ) 

  inputDF.printSchema
  inputDF.show()
}
Community
  • 1
  • 1
Martin Senne
  • 5,939
  • 6
  • 30
  • 47
  • So there is no way to just do a blanket reset of the columns? I can always grab the column names into a list and loop through that list if I need to. BTW, the "pimp my library" thing is awesome! – J Calbreath Oct 19 '15 at 14:36
  • Ahhh, now I know what you mean. You can specify a schema via `StructType` with `createDataFrame`. Will add an edit to my answer. – Martin Senne Oct 19 '15 at 14:49
  • All this to enable what is typically a default behavior in just about any SQL engine: that a field can contain nulls? – Alexander Tronchin-James Jan 17 '17 at 18:45
  • My observation is that this creates a logical plan on the source RDD, resulting in extra processing - and it seems to be considered an action since my stage now stops at the `createDataFrame` line, rather than some later processing stage. – Stephen Jul 19 '21 at 04:42
42

Another option, if you need to change dataframe in-place, and recreating is impossible, you can do something like this:

.withColumn("col_name", when(col("col_name").isNotNull, col("col_name")).otherwise(lit(null)))

Spark will then think that this column may contain null, and nullability will be set to true. Also, you can use udf, to wrap your values in Option. Works fine even for streaming cases.

Rayan Ral
  • 1,862
  • 2
  • 17
  • 17
  • 7
    Any idea how to achieve the inverse (set a column as non-nullable) in a structured streaming dataframe? – redsk Dec 02 '19 at 16:52
  • 9
    Nice! And the PySpark version is `.withColumn("col_name", when(col("col_name").isNotNull(), col("col_name")).otherwise(lit(None)))` – AltShift Dec 12 '19 at 00:42
  • `otherwise` seems not needed. It's shown in [this answer](https://stackoverflow.com/questions/68578277#68578278) – ZygD Jul 29 '21 at 19:13
  • 2
    This should be the accepted answer – JBernardo Dec 20 '21 at 19:44
  • `Works fine even for streaming cases.` For me this works only when foreachBatch sink is used. For other sinks, it seemingly works but not really - checking df schema shows that nullable changed from false to true, but somehow the column still behaves as if nullable=false. To give a specific example, calling `to_avro` on the changed column (without providing the schema to `to_avro`, which is the only way for spark 2.4) yields avro bytes of the value directly, without the initial int indicating position in the avro union (i.e. behavior for nullable=false instead of behavior for nullable=true). – oskarryn Nov 02 '22 at 13:33
17

This is a late answer, but wanted to give an alternative solution for people that come here. You can automatically make a DataFrame Column nullable from the start by the following modification to your code:

case class input(id:Option[Long], var1:Option[Int], var2:Int, var3:Double)
val inputDF = sqlContext
  .createDataFrame(List(input(Some(1110),Some(0),1001,-10.00),
    input(Some(1111),Some(1),1001,10.00),
    input(Some(1111),Some(0),1002,10.00)))
inputDF.printSchema

This will yield:

root
 |-- id: long (nullable = true)
 |-- var1: integer (nullable = true)
 |-- var2: integer (nullable = false)
 |-- var3: double (nullable = false)

defined class input
inputDF: org.apache.spark.sql.DataFrame = [id: bigint, var1: int, var2: int, var3: double]

Essentially, if you declare a field as an Option by using Some([element]) or None as the actual inputs, then that field be nullable. Otherwise, the field will not be nullable. I hope this helps!

Sidd Singal
  • 559
  • 6
  • 16
10

More compact version of setting all columns nullable parameter

Instead of case StructField( c, t, _, m) ⇒ StructField( c, t, nullable = nullable, m) one can use _.copy(nullable = nullable). Then the whole function can be written as:

def setNullableStateForAllColumns( df: DataFrame, nullable: Boolean) : DataFrame = {
  df.sqlContext.createDataFrame(df.rdd, StructType(df.schema.map(_.copy(nullable = nullable))))
}
matemaciek
  • 621
  • 7
  • 11
4

Thanks Martin Senne. Just a little addition. In case of inner struct types, you may need to set nullable recursively, like this:

def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
    def set(st: StructType): StructType = {
      StructType(st.map {
        case StructField(name, dataType, _, metadata) =>
          val newDataType = dataType match {
            case t: StructType => set(t)
            case _ => dataType
          }
          StructField(name, newDataType, nullable = nullable, metadata)
      })
    }

    df.sqlContext.createDataFrame(df.rdd, set(df.schema))
  }
skotlov
  • 186
  • 1
  • 4
2

Just use java.lang.Integer instead of scala.Int in your case class.

case class input(id:Long, var1:java.lang.Integer , var2:java.lang.Integer , var3:java.lang.Double)
echo
  • 29
  • 3
1

As I came here searching for a pyspark solution and did not find one, here it is:

from pyspark.sql.types import StructType, StructField

df = sqlContext.createDataFrame(
[(1, "a", 4), (3, "B", 5)], ("col1", "col2", "col3"))

df.show()
df.schema

+----+----+----+  
|col1|col2|col3|  
+----+----+----+  
|   1|   a|   4|  
|   3|   B|   5|  
+----+----+----+  

StructType(
    List(
        StructField(col1,LongType,true),
        StructField(col2,StringType,true),
        StructField(col3,LongType,true)
    )
)

schema = StructType()
for field in df.schema.fields:
    schema.add(StructField(field.name, field.dataType, False))
newdf = spark.createDataFrame(df.rdd, schema)

newdf.schema
StructType(
    List(
        StructField(col1,LongType,false),
        StructField(col2,StringType,false),
        StructField(col3,LongType,false)
    )
)
jugi
  • 622
  • 7
  • 15
0

When you want to drop a column and create a new column in spark dataframe, you can create a nullable column like.

  1. df.withColumn("Employee_Name", when(lit('') == '', '').otherwise(lit(None)))

NOTE: The above code works if you want to create a column of type string and also make it nullable

  1. df.withColumn("Employee_Name", when(lit('') == '', 0).otherwise(lit(None)))

NOTE: The above code works if you want to create a column of type integer and also make it nullable

Hemanth Vatti
  • 85
  • 1
  • 8
  • 1
    Problem statement is to change schema of existing and not creating new columns – devilpreet Feb 23 '22 at 09:10
  • 1
    Sometimes its easy to drop the column and create a new column with the updated schema, I was searching for this answer but nobody posted this. So I thought this will be helpful for someone. – Hemanth Vatti May 05 '22 at 23:34