2

While trying to move data from S3 to Mongo via spark-mongo connector and using SparkSQL for transformations, I'm getting stuck with having to transform a column from string to UUID. The column is stored as string in S3 and Im looking for the appropriate transformation function to call out to store it as UUID while saving to Mongo.

Tried using udf but not able to read the specific column from data frame and convert a string value into uuid. Any advice on how to write a spark udf ?

Sample Input from S3 file : key1 string, key2 string, key2_type int

Expected output into Mongo : key1 UUID, key2 string, key2_type int

Currently we use SparkSQL transformation reading from S3 saving into Mongo

sourceMap = sourceMap ++ jsonObjectPropertiesToMap(List("s3path", "fileformat", "awsaccesskeyid", "awssecretaccesskey"), source)
            sparkSession.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.‌​input.dir.recursive" , "true")
            setAWSCredentials (sparkSession, sourceMap);
            df = s3ToDataFrame(sourceMap("s3path"), sourceMap("fileformat"), sparkSession)
            
val dft = sparkSession.sql(mappingsToTransformedSQL(mappings))

destinationMap = destinationMap ++ jsonObjectPropertiesToMap(List("cluster", "database", "authenticationdatabase","collection", "login", "password"), destination)
            dataFrameToMongodb(destinationMap("cluster"), destinationMap("database"), destinationMap("authenticationdatabase"),destinationMap("collection"),destinationMap("login"),destinationMap("password"), dft)

Here is the function as recommended below for stringtoUUID

def stringToUUID(uuid : String):String = {
          java.util.UUID.fromString(
            uuid
              .replaceFirst(
                "(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
              )
          ).toString
        }

        val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
        
        dft.withColumn("key1", stringToUUIDUdf(df("key1")))

Here is the error we get

17/07/01 17:51:05 INFO SparkSqlParser: Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp
org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator !Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22];;
!Project [UDF(key1#1) AS key1#30, key1_type_id#18, key2#19, key2_type_id#20, site#21, updated#22]
+- Project [key1#1 AS key1#17, key1_type_id#2 AS key1_type_id#18, key2#3 AS key2#19, key2_type_id#4 AS key2_type_id#20, site#5 AS site#21, updated#6 AS updated#22]
   +- SubqueryAlias tmp, `tmp`
      +- Relation[key1#1,key1_type_id#2,key2#3,key2_type_id#4,site#5,updated#6,pdateid#7] parquet
dilsingi
  • 2,938
  • 14
  • 24
  • How do you want to convert string to UUID? Do you mean formatting string to UUID format with dashes? In this case please look at this: https://stackoverflow.com/questions/18986712/creating-a-uuid-from-a-string-with-no-dashes – Piotr Kalański Jul 01 '17 at 21:21
  • 1
    @PiotrKalański I'm trying to within Apache spark framework. I agree that will have to use UUID.fromString. But how to against a spark dataframe column? – dilsingi Jul 01 '17 at 21:42
  • It seems that column `key` is not available. Can you show output of command `dft.show()`? Probably your function `mappingsToTransformedSQL` is generating wrong query. – Piotr Kalański Jul 02 '17 at 06:47
  • @PiotrKalański without the UDF function added, everything executes perfectly – dilsingi Jul 02 '17 at 14:41

2 Answers2

1

Start from defining Scala function:

def stringToUUID(uuid: String): String = {
  java.util.UUID.fromString(
    uuid
      .replaceFirst(
        "(\\p{XDigit}{8})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}{4})(\\p{XDigit}+)", "$1-$2-$3-$4-$5"
      )
  ).toString
}

Create UDF based on above function:

val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))

Add new uuid column using withColumn transformation:

df.withColumn("uuid", stringToUUIDUdf(df("text")))

You can also use select transformation:

df.select(stringToUUIDUdf(df("text")).alias("uuid"))

Example:

val df = session.createDataset(Seq(
  "7158e7a4c1284697bcab58dfb8c80e66",
  "cf251f4c667c46b3a9f67681f3be2338",
  "42d3ee515d8c4268b47b579170c88e4c",
  "6b7e3222292d4dc5a8a369f7fede7dc4",
  "b371896d39d04fbb8a8646a176e60d17",
  "e2b57f1677154c5bbe181a575aba4684",
  "2a2e11c4cc604673bbd13b22f029dabb",
  "fcad3f649a114336a721fc3eaefd6ce1",
  "f3f6fcfd16394e1e9c98aae0bd062432",
  "8b0e1929e335489997bfca20bb021d62"
)).toDF("text")

df.withColumn("uuid", stringToUUIDUdf(df("text"))).show(false)

Result: +--------------------------------+------------------------------------+ |text |uuid | +--------------------------------+------------------------------------+ |7158e7a4c1284697bcab58dfb8c80e66|7158e7a4-c128-4697-bcab-58dfb8c80e66| |cf251f4c667c46b3a9f67681f3be2338|cf251f4c-667c-46b3-a9f6-7681f3be2338| |42d3ee515d8c4268b47b579170c88e4c|42d3ee51-5d8c-4268-b47b-579170c88e4c| |6b7e3222292d4dc5a8a369f7fede7dc4|6b7e3222-292d-4dc5-a8a3-69f7fede7dc4| |b371896d39d04fbb8a8646a176e60d17|b371896d-39d0-4fbb-8a86-46a176e60d17| |e2b57f1677154c5bbe181a575aba4684|e2b57f16-7715-4c5b-be18-1a575aba4684| |2a2e11c4cc604673bbd13b22f029dabb|2a2e11c4-cc60-4673-bbd1-3b22f029dabb| |fcad3f649a114336a721fc3eaefd6ce1|fcad3f64-9a11-4336-a721-fc3eaefd6ce1| |f3f6fcfd16394e1e9c98aae0bd062432|f3f6fcfd-1639-4e1e-9c98-aae0bd062432| |8b0e1929e335489997bfca20bb021d62|8b0e1929-e335-4899-97bf-ca20bb021d62| +--------------------------------+------------------------------------+

Piotr Kalański
  • 669
  • 1
  • 5
  • 8
  • Problem is we are reading S3 and using SparkSQL transform and writing it back to Mongo. We get the data from S3 and pass it to val dft = sparkSession.sql(mappingsToTransformedSQL(mappings)). Now calling dft.withColumn("key1", stringToUUIDUdf(df("key1"))), results in an error Parsing command: Select key1 AS key1,key1_type_id AS key1_type_id,key2 AS key2,key2_type_id AS key2_type_id,site AS site,updated AS updated FROM tmp org.apache.spark.sql.AnalysisException: resolved attribute(s) key1#1 missing from key2#19,updated#22,site#21,key1#17,key1_type_id#18,key2_type_id#20 in operator, – dilsingi Jul 01 '17 at 22:53
  • Added the function and the error I get into my original post. – dilsingi Jul 01 '17 at 23:04
1

Use the below logic to get it working.

Dependency :

   <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.4.2</version>
    </dependency>

Function:

def test(uuids : String): Binary ={ 
val uuid = UUID.fromString(uuids)
val holder = new BsonDocument 
val writer = new BsonDocumentWriter(holder)      
writer.writeStartDocument()
writer.writeName("uuid")
new UuidCodec(UuidRepresentation.STANDARD).encode(writer, uuid, 
EncoderContext.builder().build())
writer.writeEndDocument()
val bsonBinary = holder.getBinary("uuid");
val test2=  new Binary(bsonBinary.getType(), bsonBinary.getData()); 
return test2

}

knowledge_seeker
  • 362
  • 3
  • 20