1

EDIT: Sorry about previous question quality, I hope this one would be more clear: With Spark application I'm loading whole directory of following JSON files:

    {
        "type": "some_type",
        "payload": {
            "data1": {
                "id": "1"           
            },
            "data2": {
                "id": "1",

            },
            "data3": {
                "id": "1"
            },
            "dataset1": [{
                "data11": {
                    "id": "1",
                },
                "data12": {
                    "id": "1",
                }
            }],
            "masterdata": {
                "md1": [{
                    "id": "1"
                },
                {
                    "id": "2"
                },
                {
                    "id": "3"
                }],
                "md2": [{
                    "id": "1",
                },
                {
                    "id": "2",
                },
                {
                    "id": "3",
                }]
            }
        }
    }

into a DataFrame and save as temp table in order to use it later. In this Json, fields from "payload" node are always present, but subnodes in "masterdata" are optional. Next step is creating multiple DataFrames for each subnode of Json like this: DataFrame data1 contains data of node "data1" from all files and looks like a regular table with column "id". After first processing part my Spark state is as follow: DataFrames: data1(id), data2(id), data3(id), data11(id), data12(id), md1(id), md2(id)

Here comes the problem - if one of the JSON files in directory doesn't contain md2 node, I cannot run neither show() nor collect() on "md2" DataFrame due to NullPointException. I would understand if all files are missing "md2" node so It could not create md2 DataFrame, but in this case I expect md2 DataFrame simply not have data from json file that doesn't have node md2, but contains all others.

Technical details: To read data from nested node I'm using rdd.map & rdd.flatmap, then I'm, converting it to DataFrame with custom column names

If I run application when all files in directory contains all nodes everything works, but if a single file is missing md2 node App fails upon .show() or .collect()

BTW If node exists but its empty all works fine.

Is there any way to make Spark support optional Json nodes or handle missing nodes within rdd.map&flatmap?

I hope it's more clear than previous question

On @Beryllium request, here are rdd operations that I'm using to get md2 DataFrame

    val jsonData = hiveContext.sql("SELECT `payload`.masterdata.md2 FROM jsonData")
    val data = jsonData.rdd.flatMap(row => row.getSeq[Row](0)).map(row => (
    row.getString(row.fieldIndex("id"))
    )).distinct
    val dataDF = data.toDF("id")    
Beryllium
  • 12,808
  • 10
  • 56
  • 86
Silverrose
  • 160
  • 4
  • 18
  • 1
    Possible duplicate of [What is a Null Pointer Exception, and how do I fix it?](http://stackoverflow.com/questions/218384/what-is-a-null-pointer-exception-and-how-do-i-fix-it) – Petter Friberg Nov 27 '15 at 09:19
  • 1
    @PetterFriberg with all due respect, it's not cause it has NPE on it that it is a duplicate, it is not in this case. – eliasah Nov 27 '15 at 09:23
  • @Silverrose you need to provide a [MCVE](http://stackoverflow.com/help/mcve) so we can help! – eliasah Nov 27 '15 at 09:23
  • @eliasah Sorry but as the current question is posted, I think this is the best answer one can give. – Petter Friberg Nov 27 '15 at 09:25
  • @PetterFriberg nothing to be sorry about. I totally agree about the lack of quality of the question thought! – eliasah Nov 27 '15 at 09:32
  • @all I'll try to prepare sth more readable :) – Silverrose Nov 27 '15 at 09:37
  • 1
    Still not sufficient; please add the rdd.map/flatMaps and the conversion to DF - cannot reproduce your problem using plain `sqlContext.read.json`. – Beryllium Nov 27 '15 at 10:10

1 Answers1

3

Quick fix

Try to insert a filter() like this:

sqlContext.sql("SELECT payload.masterdata.md2 FROM jsonData")
  .rdd
  .filter(_.getSeq[Row](0) != null)
  .flatMap(row => row.getSeq[Row](0))
  .map(row => (row.getString(row.fieldIndex("id"))))
  .distinct
  .toDF("id")
  .show()

Using explode()

This removes the null values as soon as possible: So it should be faster (at least it's shorter):

sqlContext
  .sql("select t.a.id from (SELECT explode(payload.masterdata.md2) as a FROM jsonData) t")
  • explode() explodes away the null.
  • Then the sub query extracts only the ID

Even simpler: Extract ID first, then explode():

sqlContext.sql("SELECT explode(payload.masterdata.md2.id) FROM jsonData").show()
Beryllium
  • 12,808
  • 10
  • 56
  • 86
  • @Silverrose This is just a quick hack; please tell me, if it returns the expected result - there are probably better ways. – Beryllium Nov 27 '15 at 11:25
  • 1
    @up Thanks, that did the job. I thought it would automaticaly ignore empty values since schema was "ready" for optional nodes. Thing that is still unclear for me is, why exception is thrown on .show() not on processing? It allows me to create DataFrame and us it later one, but throws exception once I try to display this DF or other based on this one. – Silverrose Nov 27 '15 at 11:37
  • The schema is fine - the map fails. And since an RDD is evaluated lazily, you see the error in `show()`, because it's there where the result is materialized eventually. – Beryllium Nov 27 '15 at 11:45
  • I'll keep explode solution in mind when I struggle performance issue, so far rdd.map is fine. Thx a lot! – Silverrose Nov 27 '15 at 11:48