Saying to build such a system using Spark Structured Stream. The input is a Kafka topic. There will be a Kafka message sent to the topic when user do a search. Each Kafka messages contains user's search keywords and user id, like
{
"user-id": "76471849572",
"search-keyword": "guitar",
"ip": "1.2.3.4"
}
SSS application consumes the data and try to give some suggested items to the search result for the user. The structure of input DataFrame is the same as input:
user-id | search-keyword | ip
A nature idea is using user's history order list data in the following calculation. Order data is stored in DB using sharding. There are many DB servers for the same table. So
- Spark can use a JDBC link to create a RDD/DataFrame. But for Order, there are many DB instances and can't be covered by a single JDBC link. It is must to use an existing DAO to query data from DB servers.
- Data will be too many to be fully loaded to Spark as a RDD/DataFrame.
So the first thing when SSS application gets the message is to query user's history order list using user-id. Then create a new DataFrame that contains both input data and Order list data.
Schema of Order is like:
Order
userId: String
items: List<Item>
orderedTimeStamp: Long
amount: Double
And schema of Item is like:
Item
sku: String
props: Map[String, String]
count: Int
Ideally, after data query, the DataFrame should be like:
user-id | search-keyword | ip | order-list
And the newly added order-list
column is a list, each element of the list has the same structure of the original Order above.
I checked withColumn
API mentioned here
Create new column with function in Spark Dataframe
It is good enough if just add a simple type. But for a complex type, like List, seems it is not well supported (for some reason, case class should not be used to represent Order).
Another idea is using from_json
and withColumn
. But it feels very ugly and slow:
val loadOrderListAsJSONStringUDF = udf(code)
val newDF = inputDF.withColumn("order-list",
from_json(loadOrderListAsJSONStringUDF(dfToBeEnrich.col("user-id")), buildStructTypeForOrderList()))
And another thing is buildStructTypeForOrderList
. There is no ListType
for StructType
to use.
So, is there a best practice in SSS to load external&schema data into original DataFrame as a new column?
Using join to create the expected DataFrame is a nature way. The issue is
- Order table is sharded in many DB servers. It is not easy to use Spark's JDBC source feature to load data. For now, the original Order DAO should be used to load Order data.
- Order table is a big table, it is not practical to load all data into Spark. Order data should be queried from DB servers using DAO by
user-id
.