I have trouble using concat_list on a spark dataframe in spark 1.6.So I am planing to go with UDAF for concating multiple row values to one single row with comma seperated values
Here's link! for the original post,I need the same code in java SE 7,apache-spark 1.6.
Scala code
object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer.update(0, ArrayBuffer.empty[String])
}
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}
def evaluate(buffer: Row) = UTF8String.fromString(
buffer.getSeq[String](0).mkString(","))}
Java code
Please correct me if I am wrong ,
public static class GroupConcat extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public GroupConcat() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.StringType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("list", DataTypes.createArrayType(DataTypes.StringType), true));
bufferSchema = DataTypes.createStructType(bufferFields);
}
// The data type of the returned value
public DataType dataType() {
return DataTypes.DoubleType;
}
// Whether this function always returns the same output on the identical input
public boolean deterministic() {
return true;
}
// Data types of input arguments of this aggregate function
public StructType inputSchema() {
return inputSchema;
}
// Data types of values in the aggregation buffer
public StructType bufferSchema() {
return bufferSchema;
}
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0,null);
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
public void update(MutableAggregationBuffer buffer, Row input) {
if (!input.isNullAt(0)) {
// buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
buffer.update(0, buffer.getSeq(0)+input.getString(0));
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
buffer1.update(0, buffer1.getSeq(0).toString()+buffer2.getSeq(0).toString());
}
// Calculates the final result
public String evaluate(Row buffer) {
// return ((double) buffer.getLong(0)) / buffer.getLong(1);
return buffer.getSeq(0).mkString(",");
}
}
Thanks.