65

When I create a DataFrame from a JSON file in Spark SQL, how can I tell if a given column exists before calling .select

Example JSON schema:

{
  "a": {
    "b": 1,
    "c": 2
  }
}

This is what I want to do:

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

but I can't find a good function for hasColumn. The closest I've gotten is to test if the column is in this somewhat awkward array:

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)
Community
  • 1
  • 1
ben
  • 653
  • 1
  • 5
  • 7

11 Answers11

124

Just assume it exists and let it fail with Try. Plain and simple and supports an arbitrary nesting:

import scala.util.Try
import org.apache.spark.sql.DataFrame

def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess

val df = sqlContext.read.json(sc.parallelize(
  """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))

hasColumn(df, "foobar")
// Boolean = false

hasColumn(df, "foo")
// Boolean = true

hasColumn(df, "foo.bar")
// Boolean = true

hasColumn(df, "foo.bar.foobar")
// Boolean = true

hasColumn(df, "foo.bar.foobaz")
// Boolean = false

Or even simpler:

val columns = Seq(
  "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")

columns.flatMap(c => Try(df(c)).toOption)
// Seq[org.apache.spark.sql.Column] = List(
//   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)

Python equivalent:

from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()

has_column(df, "foobar")
## False

has_column(df, "foo")
## True

has_column(df, "foo.bar")
## True

has_column(df, "foo.bar.foobar")
## True

has_column(df, "foo.bar.foobaz")
## False
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 4
    This works with structured field as well. The solutions that uses `contains` function does not! +1 – Andrea May 24 '17 at 09:28
  • On first glance the `df(path)` or `df[col]` looks like it would be a very costly test but this is all lazy dag building so it's cheap, is that correct? – Davos Nov 21 '19 at 05:00
  • @Davos That has little to do with laziness. `Columns` are not data containers, but components of query description model. In broader context to know anything about `Dataset` you have process and check its logical `QueryExectution.analyzed` plan, that applies to `col` / `apply` (both `resolve`) or `schema` / `columns` alike. – 10465355 Nov 21 '19 at 11:34
  • @10465355saysReinstateMonica Thanks that's exactly what I meant. I am not referring to `lazy` directives in Scala but rather the phased approach of how Spark creates a logical plan, transforms it to a physical plan and then executes the tasks on the cluster. If this is _resolved_ at the logical plan stage then it's cheap. – Davos Nov 21 '19 at 11:40
  • @zero323 The python equivalent does not work on struct columns which are inside an array. e.g. '{ "name": "test", "address":[{"houseNumber":"1234"}]}' should df.select be used instead of df[col] – puneet goyal Jul 06 '21 at 12:23
  • @zero323 how will you check of nested array.. in df(path) it does not accept array elements like property[0].property[1].number.. – amarnath harish Dec 08 '21 at 13:11
73

Another option which I normally use is

df.columns.contains("column-name-to-check")

This returns a boolean

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Jai Prakash
  • 2,461
  • 1
  • 26
  • 27
13

Actually you don't even need to call select in order to use columns, you can just call it on the dataframe itself

// define test data
case class Test(a: Int, b: Int)
val testList = List(Test(1,2), Test(3,4))
val testDF = sqlContext.createDataFrame(testList)

// define the hasColumn function
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)

// then you can just use it on the DF with a given column name
hasColumn(testDF, "a")  // <-- true
hasColumn(testDF, "c")  // <-- false

Alternatively you can define an implicit class using the pimp my library pattern so that the hasColumn method is available on your dataframes directly

implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
    def hasColumn(colName: String) = df.columns.contains(colName)
}

Then you can use it as:

testDF.hasColumn("a") // <-- true
testDF.hasColumn("c") // <-- false
Daniel B.
  • 929
  • 4
  • 8
11

Try is not optimal as the it will evaluate the expression inside Try before it takes the decision.

For large data sets, use the below in Scala:

df.schema.fieldNames.contains("column_name")
Satan Pandeya
  • 3,747
  • 4
  • 27
  • 53
Nitin Mathur
  • 185
  • 1
  • 3
  • I agree `Try` is not optimal. What I do is: I create a column with the array of fields and then test with `array_contains`: `val fields = df.schema.fieldNames; df.withColumn("fields",lit(fields)).withColumn("has_column", when(array_contains($"fields","field1"),lit(true)))` ``` – pedromorfeu Jul 14 '20 at 09:29
8

For those who stumble across this looking for a Python solution, I use:

if 'column_name_to_check' in df.columns:
    # do something

When I tried @Jai Prakash's answer of df.columns.contains('column-name-to-check') using Python, I got AttributeError: 'list' object has no attribute 'contains'.

mefryar
  • 199
  • 4
  • 7
  • Does this way of checking col in the data-frame slow down the spark processing? – vasista Sep 09 '20 at 13:09
  • @vasista Can you help me better understand your question? Is there some other approach against which you want to compare performance? – mefryar Sep 11 '20 at 01:58
4

Your other option for this would be to do some array manipulation (in this case an intersect) on the df.columns and your potential_columns.

// Loading some data (so you can just copy & paste right into spark-shell)
case class Document( a: String, b: String, c: String)
val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF

// The columns we want to extract
val potential_columns = Seq("b", "c", "d")

// Get the intersect of the potential columns and the actual columns, 
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show

Alas this will not work for you inner object scenario above. You will need to look at the schema for that.

I'm going to change your potential_columns to fully qualified column names

val potential_columns = Seq("a.b", "a.c", "a.d")

// Our object model
case class Document( a: String, b: String, c: String)
case class Document2( a: Document, b: String, c: String)

// And some data...
val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF

// We go through each of the fields in the schema.
// For StructTypes we return an array of parentName.fieldName
// For everything else we return an array containing just the field name
// We then flatten the complete list of field names
// Then we intersect that with our potential_columns leaving us just a list of column we want
// we turn the array of strings into column objects
// Finally turn the result into a vararg (: _*)
df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show

This only goes one level deep, so to make it generic you would have to do more work.

Michael Lloyd Lee mlk
  • 14,561
  • 3
  • 44
  • 81
2

in pyspark you can simply run

'field' in df.columns

1

If you shred your json using a schema definition when you load it then you don't need to check for the column. if it's not in the json source it will appear as a null column.

        val schemaJson = """
  {
      "type": "struct",
      "fields": [
          {
            "name": field1
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": field2
            "type": "string",
            "nullable": true,
            "metadata": {}
          }
      ]
  }
        """
    val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]

    val djson = sqlContext.read
    .schema(schema )
    .option("badRecordsPath", readExceptionPath)
    .json(dataPath)
Shaun Ryan
  • 1,458
  • 12
  • 16
0
def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
  Try(df.select(colName)).isSuccess

Use the above mentioned function to check the existence of column including nested column name.

Ömer Erden
  • 7,680
  • 5
  • 36
  • 45
0

In PySpark, df.columns gives you a list of columns in the dataframe, so "colName" in df.columns would return a True or False. Give a try on that. Good luck!

Jie
  • 1,107
  • 1
  • 14
  • 18
  • df1.columns displays ['bankData', 'reference', 'submissionTime']; but df1['bankData']['userAddress'].columns displays Column<'bankData[userAddress][columns]'>, doesnt show me the struct, am I missing something? – Manza Mar 23 '21 at 01:58
0

For nested columns you can use

df.schema.simpleString().find('column_name')
Ardent Coder
  • 3,777
  • 9
  • 27
  • 53
  • 3
    This does not seem reliable to me if you have a Column named similarly to the extraneous text in the Schema string. – Jerry Nixon Aug 18 '21 at 23:06