While trying to move data from S3 to Mongo via spark-mongo connector and using SparkSQL for transformations, I'm getting stuck with having to transform a column from string to UUID. The column is stored as string in S3 and Im looking for the appropriate transformation function to call out to store it as UUID while saving to Mongo.
Tried using udf but not able to read the specific column from data frame and convert a string value into uuid. Any advice on how to write a spark udf ?
Sample Input from S3 file : key1 string, key2 string, key2_type int
Expected output into Mongo : key1 UUID, key2 string, key2_type int
Currently we use SparkSQL transformation reading from S3 saving into Mongo
sourceMap = sourceMap ++ jsonObjectPropertiesToMap(List("s3path", "fileformat", "awsaccesskeyid", "awssecretaccesskey"), source)
sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive" , "true")
setAWSCredentials (sparkSession, sourceMap);
df = s3ToDataFrame(sourceMap("s3path"), sourceMap("fileformat"), sparkSession)
val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))
destinationMap = destinationMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase","collection", "login", "password"), destination)
dataFrameToMongodb(destinationMap("cluster"), destinationMap("database"), destinationMap("authenticationdatabase"),destinationMap("collection"),destinationMap("login"),destinationMap("password"), dft)
Here is the function as recommended below for stringtoUUID
def stringToUUID(uuid : String):String = {
java.util.UUID.fromString(
uuid
.replaceFirst(
"(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
)
).toString
}
val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
dft.withColumn("key1", stringToUUIDUdf(df("key1")))
Here is the error we get
17/07/01 17:51:05 INFO SparkSqlParser: Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp
org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator !Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22];;
!Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22]
+- Project [key1#1 AS key1#17, key1_type_id#2 AS key1_type_id#18, key2#3 AS key2#19, key2_type_id#4 AS key2_type_id#20, site#5 AS site#21, updated#6 AS updated#22]
+- SubqueryAlias tmp, `tmp`
+- Relation[key1#1,key1_type_id#2,key2#3,key2_type_id#4,site#5,updated#6,pdateid#7] parquet