0

I am building up a schema to accept some data streaming in. It has an ArrayType with some elements. Here is my StructType with the ArrayType:

val innerBody = StructType(
    StructField("value", LongType, false) ::
    StructField("spent", BooleanType, false) ::
    StructField("tx_index", LongType, false) :: Nil)

val prev_out = StructType(StructField("prev_out", innerBody, false) :: Nil)

val body = StructType(
StructField("inputs", ArrayType(prev_out, false), false) :: 
StructField("out", ArrayType(innerBody, false), false) :: Nill)

val schema = StructType(StructField("x",  body, false) :: Nil)

This builds a schema like"

root
 |-- bit: struct (nullable = true)
 |    |-- x: struct (nullable = false)
 |    |    |-- inputs: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- prev_out: struct (nullable = false)
 |    |    |    |    |    |-- value: long (nullable = false)
 |    |    |    |    |    |-- spent: boolean (nullable = false)
 |    |    |    |    |    |-- tx_index: long (nullable = false)
 |    |    |-- out: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- value: long (nullable = false)
 |    |    |    |    |-- spent: boolean (nullable = false)
 |    |    |    |    |-- tx_index: long (nullable = false)

I am trying to select the value from the "value element" in schema as it is streaming in. I am using the writeStream sink.

val parsed = df.select("bit.x.inputs.element.prev_out.value")
.writeStream.format("console").start() 

I have this but code above, but gives an error.

Message: cannot resolve 'bit.x.inputs.element.prev_out.value' given input columns: [key, value, timestamp, partition, offset, timestampType, topic];;

How can I access the "value" element in this schema?

1 Answers1

1

If you have data frame like this, first explode and followed by select will help you.

df.printSchema()
//root
//|-- bit: struct (nullable = true)
//|    |-- x: struct (nullable = true)
//|    |    |-- inputs: array (nullable = true)
//|    |    |    |-- element: struct (containsNull = true)
//|    |    |    |    |-- prev_out: struct (nullable = true)
//|    |    |    |    |    |-- spent: boolean (nullable = true)
//|    |    |    |    |    |-- tx_infex: long (nullable = true)
//|    |    |    |    |    |-- value: long (nullable = true)

import org.apache.spark.sql.functions._
val intermediateDf: DataFrame = df.select(explode(col("bit.x.inputs")).as("interCol"))
intermediateDf.printSchema()

//root
//|-- interCol: struct (nullable = true)
//|    |-- prev_out: struct (nullable = true)
//|    |    |-- spent: boolean (nullable = true)
//|    |    |-- tx_infex: long (nullable = true)
//|    |    |-- value: long (nullable = true)

val finalDf: DataFrame = intermediateDf.select(col("interCol.prev_out.value").as("value"))
finalDf.printSchema()
//root
//|-- value: long (nullable = true)


finalDf.show()
//+-----------+
//|      value|
//+-----------+
//|12347628746|
//|12347628746|
//+-----------+
Rumesh Krishnan
  • 443
  • 4
  • 16