1

I am loading a file of JSON objects as a PySpark SchemaRDD. I want to change the "shape" of the objects (basically, I'm flattening them) and then insert into a Hive table.

The problem I have is that the following returns a PipelinedRDD not a SchemaRDD:

log_json.map(flatten_function)

(Where log_json is a SchemaRDD).

Is there either a way to preserve type, cast back to the desired type, or efficiently insert from the new type?

zero323
  • 322,348
  • 103
  • 959
  • 935
Marcin
  • 48,559
  • 18
  • 128
  • 201
  • Could you provide some details? You've mentioned `SchemaRDD` so I guess it is Spark < 1.3, am I right? How complex is input and output schema? – zero323 Jul 20 '15 at 16:35
  • @zero323 The output is flat; the input is not. We have spark 1.2. – Marcin Jul 20 '15 at 16:46

4 Answers4

4

More an idea than a real solution. Let's assume your data looks like this:

data = [
    {"foobar":
        {"foo": 1, "bar": 2, "fozbaz": {
            "foz": 0, "baz": {"b": -1, "a": -1, "z": -1}
        }}}]

import json 
with open("foobar.json", "w") as fw:
    for record in data:
        fw.write(json.dumps(record))

First lets load it and check schema:

>>> srdd = sqlContext.jsonFile("foobar.json")
>>> srdd.printSchema()
root
 |-- foobar: struct (nullable = true)
 |    |-- bar: integer (nullable = true)
 |    |-- foo: integer (nullable = true)
 |    |-- fozbaz: struct (nullable = true)
 |    |    |-- baz: struct (nullable = true)
 |    |    |    |-- a: integer (nullable = true)
 |    |    |    |-- b: integer (nullable = true)
 |    |    |    |-- z: integer (nullable = true)
 |    |    |-- foz: integer (nullable = true)

Now we register table as suggested by Justin Pihony and extract schema:

srdd.registerTempTable("srdd")
schema = srdd.schema().jsonValue()

Instead of flattening data we can flatten schema using something similar to this:

def flatten_schema(schema):
    """Take schema as returned from schema().jsonValue()
    and return list of field names with full path"""
    def _flatten(schema, path="", accum=None):
        # Extract name of the current element
        name = schema.get("name")
        # If there is a name extend path
        if name is not None:
            path = "{0}.{1}".format(path, name) if path else name
        # It is some kind of struct
        if isinstance(schema.get("fields"), list):
            for field in schema.get("fields"):
                _flatten(field, path, accum)
        elif isinstance(schema.get("type"), dict):
            _flatten(schema.get("type"), path, accum)
        # It is an atomic type
        else:
            accum.append(path)
    accum = []
    _flatten(schema, "", accum)
    return  accum

add small helper to format query string:

def build_query(schema, df):
    select = ", ".join(
            "{0} AS {1}".format(field, field.replace(".", "_"))
            for field in flatten_schema(schema))
    return "SELECT {0} FROM {1}".format(select, df)

and finally results:

>>> sqlContext.sql(build_query(schema, "srdd")).printSchema()
root
 |-- foobar_bar: integer (nullable = true)
 |-- foobar_foo: integer (nullable = true)
 |-- foobar_fozbaz_baz_a: integer (nullable = true)
 |-- foobar_fozbaz_baz_b: integer (nullable = true)
 |-- foobar_fozbaz_baz_z: integer (nullable = true)
 |-- foobar_fozbaz_foz: integer (nullable = true)

Disclaimer: I didn't try to get very deep into schema structure so most likely there are some cases not covered by flatten_schema.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • My question is not how to flatten the schema. My question is how to keep my RDD as a SchemaRDD. – Marcin Jul 20 '15 at 22:20
  • 1
    I understand and I believe it actually solves the problem without need to manually specify schema. Output is flat, types are preserved and schema is already applied. – zero323 Jul 20 '15 at 23:54
0

It looks like select is not available in python, so you will have to registerTempTable and write it as a SQL statement, like

`SELECT flatten(*) FROM TABLE`

after setting up the function for use in SQL

sqlCtx.registerFunction("flatten", lambda x: flatten_function(x))

As @zero323 brought up, a function against * is probably not supported...so you can just create a function that takes in your data types and pass all of that in.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • I am pretty sure that calling udf on * is not allowed. Do you have any working example? Regarding select it is available from 1.3. – zero323 Jul 20 '15 at 17:24
0

The solution is applySchema:

mapped = log_json.map(flatten_function)
hive_context.applySchema(mapped, flat_schema).insertInto(name)

Where flat_schema is a StructType representing the schema in the same way as you would obtain from log_json.schema() (but flattened, obviously).

Marcin
  • 48,559
  • 18
  • 128
  • 201
0

you can try this one... a bit long but works

def flat_table(df,table_name):
def rec(l,in_array,name):
    for i,v in enumerate(l):
        if isinstance(v['type'],dict):
            if 'fields' in v['type'].keys():
                rec(name=name+[v['name']],l=v['type']['fields'],in_array=False)
            if 'elementType' in v['type'].keys():
                rec(name=name+[v['name']],l=v['type']['elementType']['fields'],in_array=True)
        else:#recursia stop rule
            #if this is an array so we need to explode every element in the array
            if in_array:
                field_list.append('{node}{subnode}.array'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))
            else:
                field_list.append('{node}{subnode}'.format(node=".".join(name)+'.' if name else '', subnode=v['name']))

   # table_name='x'
   field_list=[]
   l=df.schema.jsonValue()['fields']
   df.registerTempTable(table_name)
   rec(l,in_array=False,name=[table_name])

   #create the select satement

   inner_fileds=[]
   outer_fields=[]
   flag=True

   for x in field_list:
      f=x.split('.')
      if f[-1]<>'array':
        inner_fileds.append('{field} as {name}'.format(field=".".join(f),name=f[-1]))
        of=['a']+f[-1:]

        outer_fields.append('{field} as {name}'.format(field=".".join(of),name=of[-1]))
    else:
        if flag:#add the array to the inner query for expotion only once for every array field
            inner_fileds.append('explode({field}) as {name}'.format(field=".".join(f[:-2]),name=f[-3]))
            flag=False

        of=['a']+f[-3:-1]
        outer_fields.append('{field} as {name}'.format(field=".".join(of),name=of[-1]))


   q="""select {outer_fields}
        from (select {inner_fileds}
        from {table_name})      a""".format(outer_fields=',\n'.join(outer_fields),inner_fileds=',\n'.join(inner_fileds),table_name=table_name)
   return q
Lior Baber
  • 852
  • 3
  • 11
  • 25