0
Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
        new StructField("word", DataTypes.StringType, false, Metadata.empty()),
        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("files", ???, false, Metadata.empty())
};
StructType structType = new StructType(structFields);

Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);

for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
    Integer fileIndex = files.indexOf(entry.getKey());
    allFilesWords.unionAll(
            allWords.get(entry.getKey()).withColumn("files", ???)
    );
}

In the given code above, the allWords represent a mapping from a file to its word count (Row: (string, integer)). Now, I want to aggregate the result for all files into one DataFrame while keeping the original file that word was mentioned in. Since in the end, each word might have been mentioned in multiple files, the files column is designed of the type set of integers (assuming files are mapped into integers). Now, I'm trying to add a new column to the allWords DataFrames and then use the unionAll to merge them all together.

But I don't know how to define and initialize the new column (named files here) with a set holding only one item fileIndex.

Thanks to the link provided in the comments, I know I should be using functions.typedLit but this function asks for a second parameter which I don't know what to provide for it. Also, I don't know how to define the column. One last thing, the provided link is in Python and I'm looking for the Java API.

Mehran
  • 15,593
  • 27
  • 122
  • 221
  • 1
    Possible duplicate of [How to add a constant column in a Spark DataFrame?](https://stackoverflow.com/questions/32788322/how-to-add-a-constant-column-in-a-spark-dataframe) – 10465355 Oct 27 '18 at 23:02
  • @user10465355 True but that's not in Java – Mehran Oct 27 '18 at 23:45

1 Answers1

0

I've found the solution myself (with some help):

Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
        new StructField("word", DataTypes.StringType, false, Metadata.empty()),
        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("files", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty())
};
StructType structType = new StructType(structFields);

Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);
for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
    Integer fileIndex = files.indexOf(entry.getKey());
    allFilesWords.unionAll(
            allWords.get(entry.getKey())
                    .withColumn("files", functions.typedLit(seq, MyTypeTags.SeqInteger()))
    );
}

The problem was that TypeTag is a compile time artifact from Scala and based on what I've got in this other question, it needs to be generated by Scala compiler and there's no way you can generate one in Java. So, I had to compose my custom data structure's TypeTag in a Scala file and add it to my Maven Java project. For that, I followed this article.

And here's my MyTypeTags.scala file:

import scala.reflect.runtime.universe._

object MyTypeTags {
  val SeqInteger = typeTag[Seq[Integer]]
}
Mehran
  • 15,593
  • 27
  • 122
  • 221