I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append
mode I need to specify id
for each DataFrame.Row
. Is there any way for Spark to create primary keys?
-
Do you have any special requirements? Data type, consecutive values, something else? – zero323 Oct 13 '15 at 13:35
-
nope, just old good unique integers – Nhor Oct 13 '15 at 14:08
4 Answers
Scala:
If all you need is unique numbers you can use zipWithUniqueId
and recreate DataFrame. First some imports and dummy data:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Extract schema for further usage:
val schema = df.schema
Add id field:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Create DataFrame:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
The same thing in Python:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
If you prefer consecutive number your can replace zipWithUniqueId
with zipWithIndex
but it is a little bit more expensive.
Directly with DataFrame
API:
(universal Scala, Python, Java, R with pretty much the same syntax)
Previously I've missed monotonicallyIncreasingId
function which should work just fine as long as you don't require consecutive numbers:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
While useful monotonicallyIncreasingId
is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.
Note:
It is also possible to use rowNumber
window function:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Unfortunately:
WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.

- 322,348
- 103
- 959
- 935
-
will this only work with R? i know you used scala above, but all i can find about this `zipWithUniqueId` is only in SparkR docs – Nhor Oct 13 '15 at 14:25
-
-
no no, i can understand your code, I was just asking if there is anything in pyspark docs about `zipWithUniqueId`, but it seems like I was just lazy, because eventually I found it, thanks a lot for your solution! – Nhor Oct 13 '15 at 14:31
-
Sure. I've added Python code as well and a short note about window functions. – zero323 Oct 13 '15 at 14:49
-
-
The code in python above is incorrect as it is referencing df (df.columns) before defining it. – Sameer Mahajan Feb 22 '18 at 09:10
-
Also the empty orderBy clause is not allowed in latest spark 2.2+ and gives an error of: *'Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'* – Sameer Mahajan Feb 22 '18 at 09:12
-
@SameerMahajan you can solve that by passing a `lit(0)` as the `orderBy` argument – TMichel Apr 06 '18 at 15:02
from pyspark.sql.functions import monotonically_increasing_id
df.withColumn("id", monotonically_increasing_id()).show()
Note that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .

- 141
- 1
- 5
I found the following solution to be relatively straightforward for the case where zipWithIndex() is the desired behavior, i.e. for those desirng consecutive integers.
In this case, we're using pyspark and relying on dictionary comprehension to map the original row object to a new dictionary which fits a new schema including the unique index.
# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer
# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
+ dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
.map(lambda (row, id): {k:v
for k, v
in row.asDict().items() + [("uuid", id)]})\
.toDF(newSchema)

- 86
- 2
For anyone else who doesn't require integer types, concatenating the values of several columns whose combinations are unique across the data can be a simple alternative. You have to handle nulls since concat/concat_ws won't do that for you. You can also hash the output if the concatenated values are long:
import pyspark.sql.functions as sf
unique_id_sub_cols = ["a", "b", "c"]
df = df.withColumn(
"UniqueId",
sf.md5(
sf.concat_ws(
"-",
*[
sf.when(sf.col(sub_col).isNull(), sf.lit("Missing")).otherwise(
sf.col(sub_col)
)
for sub_col in unique_id_sub_cols
]
)
),
)

- 46
- 3