0

I have a DataFrame with 3 columns as below:

+-------+--------------------+-------------+
|  id   |      reports       |      hash   |
+-------+--------------------+-------------+
|abc    | [[1,2,3], [4,5,6]] |     9q5     |
|def    | [[1,2,3], [4,5,6]] |     9q5     |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |
|lmn    | [[1,2,3], [4,5,6]] |     abc     |
|opq    | [[1,2,3], [4,5,6]] |     abc     |
|rst    | [[1,2,3], [4,5,6]] |     abc     |
+-------+--------------------+-------------+

Now my problem is that I need to limit the number of rows per individual hash.

I was thinking that I can transform the hash, e.g. 9q5 in 9q5_1 for the first 1k rows, 9q5_2 for the second 1k and so on, for every value in hash.

There is a similar post but it's different, there the DataFrame is split, I want to keep a single one and change the key value.

Any suggestion about how to achieve this? Thanks

SCouto
  • 7,808
  • 5
  • 32
  • 49
phcaze
  • 1,707
  • 5
  • 27
  • 58

1 Answers1

0

I found a solution. I use the Window function to create a new column with an incremental index for each value in the geohash column. Then, I apply a udf function that composes the new hash value that I need 'geohash'_X based on the original geohash and index.

partition_size_limit = 10
generate_indexed_geohash_udf = udf(lambda geohash, index: "{0}_{1}".format(geohash, int(index / partition_size_limit)))
window = Window.partitionBy(df_split['geohash']).orderBy(df_split['id'])
df_split.select('*', rank().over(window).alias('index')).withColumn("indexed_geohash", generate_indexed_geohash_udf('geohash', 'index'))

The result is:

+-------+--------------------+-------------+-------------+-----------------+
|  id   |      reports       |      hash   |    index    | indexed_geohash |
+-------+--------------------+-------------+-------------+-----------------+
|abc    | [[1,2,3], [4,5,6]] |     9q5     |      1      |     9q5_0       |
|def    | [[1,2,3], [4,5,6]] |     9q5     |      2      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      3      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      4      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      5      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      6      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      7      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      8      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      9      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      10     |     9q5_1       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      11     |     9q5_1       |
|lmn    | [[1,2,3], [4,5,6]] |     abc     |      1      |     abc_0       |
|opq    | [[1,2,3], [4,5,6]] |     abc     |      2      |     abc_0       |
|rst    | [[1,2,3], [4,5,6]] |     abc     |      3      |     abc_0       |
+-------+--------------------+-------------+-------------+-----------------+

EDIT: also Steven's answer works perfectly

partition_size_limit = 10
window = Window.partitionBy(df_split['geohash']).orderBy(df_split['id'])
df_split.select('*', rank().over(window).alias('index')).withColumn("indexed_geohash", F.concat_ws("_", F.col("geohash"), F.floor((F.col("index") / F.lit(partition_size_limit))).cast("String")))
phcaze
  • 1,707
  • 5
  • 27
  • 58
  • 2
    you do not need an UDF for that. keep it within spark for better performances. replace `generate_indexed_geohash_udf` with something like `F.concat_ws("_", F.col("geohash"), F.floor((F.col("index")/F.lit(partition_size_limit))).cast("string")` – Steven Sep 22 '20 at 10:01