0

I have 3 rows in dataframes and in 2 rows, the column id has got null values. I need to loop through the each row on that specific column id and replace with epoch time which should be unique and should happen in dataframe itself. How can it be done? For eg:

id | name
1    a
null b
null c

I wanted this dataframe which converts null to epoch time.

id     |     name
1             a
1435232       b
1542344       c
SRIRAM RAMACHANDRAN
  • 297
  • 3
  • 8
  • 23

2 Answers2

-1
df
  .select(
    when($"id").isNull, /*epoch time*/).otherwise($"id").alias("id"),
    $"name"
  )

EDIT

You need to make sure the UDF precise enough - if it is only has millisecond resolution you will see duplicate values. See my example below that clearly illustrates my approach works:

scala> def rand(s: String): Double = Math.random
rand: (s: String)Double

scala> val udfF = udf(rand(_: String))
udfF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(StringType)))

scala> res11.select(when($"id".isNull, udfF($"id")).otherwise($"id").alias("id"), $"name").collect
res21: Array[org.apache.spark.sql.Row] = Array([0.6668195187088702,a], [0.920625293516218,b])
Terry Dactyl
  • 1,839
  • 12
  • 21
-1

Check this out

scala>  val s1:Seq[(Option[Int],String)] = Seq( (Some(1),"a"), (null,"b"), (null,"c"))
s1: Seq[(Option[Int], String)] = List((Some(1),a), (null,b), (null,c))

scala> val df = s1.toDF("id","name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> val epoch = java.time.Instant.now.getEpochSecond
epoch: Long = 1539084285

scala> df.withColumn("id",when( $"id".isNull,epoch).otherwise($"id")).show
+----------+----+
|        id|name|
+----------+----+
|         1|   a|
|1539084285|   b|
|1539084285|   c|
+----------+----+


scala>

EDIT1:

I used milliseconds, then also I get same values. Spark doesn't capture nano seconds in time portion. It is possible that many rows could get the same milliseconds. So your assumption of getting unique values based on epoch would not work.

scala> def getEpoch(x:String):Long = java.time.Instant.now.toEpochMilli
getEpoch: (x: String)Long

scala> val myudfepoch = udf( getEpoch(_:String):Long )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(StringType)))

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+-------------+----+
|           id|name|
+-------------+----+
|            1|   a|
|1539087300957|   b|
|1539087300957|   c|
+-------------+----+


scala>

The only possibility is to use the monotonicallyIncreasingId, but that values may not be of same length all the time.

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)+monotonicallyIncreasingId).otherwise($"id")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+-------------+----+
|           id|name|
+-------------+----+
|            1|   a|
|1539090186541|   b|
|1539090186543|   c|
+-------------+----+


scala>

EDIT2:

I'm able to trick the System.nanoTime and get the increasing ids, but they will not be sequential, but the length can be maintained. See below

scala> def getEpoch(x:String):String = System.nanoTime.toString.take(12)
getEpoch: (x: String)String

scala>  val myudfepoch = udf( getEpoch(_:String):String )
myudfepoch: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.withColumn("id",when( $"id".isNull,myudfepoch('name)).otherwise($"id")).show
+------------+----+
|          id|name|
+------------+----+
|           1|   a|
|186127230392|   b|
|186127230399|   c|
+------------+----+


scala>

Try this out when running in clusters and adjust the take(12), if you get duplicate values.

stack0114106
  • 8,534
  • 3
  • 13
  • 38