3

I need to implement a auto increment column in my spark sql table, how could i do that. Kindly guide me. i am using pyspark 2.0

Thank you Kalyan

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Kalyan
  • 1,880
  • 11
  • 35
  • 62
  • check this http://stackoverflow.com/questions/31955309/add-column-sum-as-new-column-in-pyspark-dataframe – Arunakiran Nulu Oct 25 '16 at 04:42
  • @MRSrinivas thanks for your detailed reply i will try it, recently i have tried from pyspark.sql.functions import monotonically_increasing_id for solving the problem it has worked . It gives ids for every row indexing from 0 thank you very much – Kalyan Nov 15 '16 at 08:12

1 Answers1

1

I would write/reuse stateful Hive udf and register with pySpark as Spark SQL does have good support for Hive.

check this line @UDFType(deterministic = false, stateful = true) in below code to make sure it's stateful UDF.

package org.apache.hadoop.hive.contrib.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
 * UDFRowSequence.
 */
@Description(name = "row_sequence",
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
  private LongWritable result = new LongWritable();

  public UDFRowSequence() {
    result.set(0);
  }

  public LongWritable evaluate() {
    result.set(result.get() + 1);
    return result;
  }
}

// End UDFRowSequence.java

Now build the jar and add the location when pyspark get's started.

$ pyspark --jars your_jar_name.jar

Then register with sqlContext.

sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")

Now use row_seq() in select query

sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")

Project to use Hive UDFs in pySpark

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
  • I have built the jar as you have specified and created the temporary functions as well. Now I created a table `sqlContext.sql("Create table abc(id int, name string)")` and `sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'John'")` and `sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'Tim'")`. When I do select * statement I am getting both `iD` as `1` instead of `1` and `2`. –  May 25 '17 at 15:55
  • Are setting `stateful = true` inside tag `@UDFType` in your code? – mrsrinivas May 26 '17 at 04:42
  • I need something like this but the question is, Will it scale with data of 200 millions. Actually I want to break a big file containing 200 millions rows in smaller files of exact 10K rows containing file. I thought to add auto-increment number for each row and read in batch with the help of like this (id >10,001 and id < 20,000). Will this work at that scale, please suggest. – Hrishikesh Mishra Mar 03 '18 at 20:08
  • is it possible to do this UDF in python? and register it in sqlContext as well? – dim_user Jul 06 '19 at 04:38