1

Saying to build such a system using Spark Structured Stream. The input is a Kafka topic. There will be a Kafka message sent to the topic when user do a search. Each Kafka messages contains user's search keywords and user id, like

{
  "user-id": "76471849572",
  "search-keyword": "guitar",
  "ip": "1.2.3.4"
}

SSS application consumes the data and try to give some suggested items to the search result for the user. The structure of input DataFrame is the same as input:

user-id | search-keyword | ip

A nature idea is using user's history order list data in the following calculation. Order data is stored in DB using sharding. There are many DB servers for the same table. So

  • Spark can use a JDBC link to create a RDD/DataFrame. But for Order, there are many DB instances and can't be covered by a single JDBC link. It is must to use an existing DAO to query data from DB servers.
  • Data will be too many to be fully loaded to Spark as a RDD/DataFrame.

So the first thing when SSS application gets the message is to query user's history order list using user-id. Then create a new DataFrame that contains both input data and Order list data.

Schema of Order is like:

Order
    userId: String
    items: List<Item>
    orderedTimeStamp: Long
    amount: Double

And schema of Item is like:

Item
    sku: String
    props: Map[String, String]
    count: Int

Ideally, after data query, the DataFrame should be like:

user-id | search-keyword | ip | order-list

And the newly added order-list column is a list, each element of the list has the same structure of the original Order above.

I checked withColumn API mentioned here Create new column with function in Spark Dataframe

It is good enough if just add a simple type. But for a complex type, like List, seems it is not well supported (for some reason, case class should not be used to represent Order).

Another idea is using from_json and withColumn. But it feels very ugly and slow:

val loadOrderListAsJSONStringUDF = udf(code)
val newDF = inputDF.withColumn("order-list",
  from_json(loadOrderListAsJSONStringUDF(dfToBeEnrich.col("user-id")), buildStructTypeForOrderList()))

And another thing is buildStructTypeForOrderList. There is no ListType for StructType to use.

So, is there a best practice in SSS to load external&schema data into original DataFrame as a new column?

Using join to create the expected DataFrame is a nature way. The issue is

  • Order table is sharded in many DB servers. It is not easy to use Spark's JDBC source feature to load data. For now, the original Order DAO should be used to load Order data.
  • Order table is a big table, it is not practical to load all data into Spark. Order data should be queried from DB servers using DAO by user-id.
zero323
  • 322,348
  • 103
  • 959
  • 935
DeepNightTwo
  • 4,809
  • 8
  • 46
  • 60

0 Answers0