-2

I'm trying to work with json data from the binance websocket.

Right now I have schema looking like this:

val schema = new StructType()
  .add("e",StringType)
  .add("E",StringType)
  .add("s",StringType)
  .add("k",StringType)
    .add("t",IntegerType)
    .add("T",IntegerType)
    .add("s",StringType)
    .add("i",StringType)
    .add("f",StringType)
    .add("L",StringType)
    .add("o",DoubleType)
    .add("c",DoubleType)
    .add("h",DoubleType)
    .add("l",DoubleType)
    .add("v",DoubleType)
    .add("n",IntegerType)
    .add("x",StringType)
    .add("q",DoubleType)
    .add("V",DoubleType)
    .add("Q",DoubleType)
    .add("B",StringType)

And I get this message from my kafka topic:

{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}

As you can see the message is nested under the "k" key.

My output a in spark i currently looking like this:

 root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


-------------------------------------------

https://imgur.com/a/9LPu9z6 

Image of dataframe since I was unable to paste it in the forum without destroying the frame.

1 Answers1

0

In your schema you need to have "k" as StructType()

  • Change the DoubleType to StringType as double data is enclosed in "" in your sample data.

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

//schema for json
val schema = new StructType().
add("e",StringType).
add("E",StringType).
add("s",StringType).
add("k",new StructType().
    add("t",LongType).
    add("T",LongType).
    add("s",StringType).
    add("i",StringType).
    add("f",StringType).
    add("L",StringType).
    add("o",StringType).
    add("c",StringType).
    add("h",StringType).
    add("l",StringType).
    add("v",StringType).
    add("n",IntegerType).
    add("x",BooleanType).
    add("q",StringType).
    add("V",StringType).
    add("Q",StringType).
    add("B",StringType)
    )

//sample data
val jsn=Seq("""{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}""")


spark.read.schema(schema).json(jsn.toDS).select("*","k.*").drop("k").show()

//+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
//|    e|            E|      s|            t|            T|      s|  i|       f|       L|      o|      c|      h|      l|     v|  n|    x|           q|     V|           Q|  B|
//+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
//|kline|1583595170076|BTCUSDT|1583595120000|1583595179999|BTCUSDT| 1m|47069029|47069101|9111.22|9114.90|9114.91|9109.65|30.297| 73|false|276055.09390|11.517|104946.56519|  0|
//+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+

Then you can cast all the required fields to float..etc types by using dataframe Column casting.

notNull
  • 30,258
  • 4
  • 35
  • 50