1

I have a column of a spark dataset (in java) and I want that all the values of this column become the columnnames of new columns (the new columns can be filled with a constant values).

For example I have:
+------------+
|    Column  | 
+------------+
| a          | 
| b          |
| c          |
+------------+

And I want: 
+------+----+----+---+
|Column| a  |  b | c |
+------+----+----+---+
| a    | 0  | 0  |0  |
| b    | 0  | 0  |0  |
| c    | 0  | 0  |0  |
+------+----+----+---+

What I tried is:

public class test{

    static SparkSession spark = SparkSession.builder().appName("Java")
            .config("spark.master", "local").getOrCreate();
    static Dataset<Row> dataset = spark.emptyDataFrame();

    public Dataset<Row> test(Dataset<Row> ds, SparkSession spark) {
        SQLContext sqlContext = new SQLContext(spark);
        sqlContext.udf().register("add", add, DataTypes.createArrayType(DataTypes.StringType));
        ds = ds.withColumn("substrings", functions.callUDF("add", ds.col("Column")));
        return ds;
    }

    private static UDF1 addSubstrings = new UDF1<String, String[]>() {
        public String[] call(String str) throws Exception {
            dataset = dataset.withColumn(str, functions.lit(0));
            String[] a = {"placeholder"};
            return a;
        }
    };
}

My problem is, sometimes I get the right result and sometimes not (the columns are not added). I do not really understand why. I was searching for a way to pass the datset to the UDF but I don't know how.

At the moment I'm solving it by using collectAsList() of the column, then iterating the Arraylist and thereby adding new columns. But that is really inefficient since I have too much data.

Rafaël
  • 977
  • 8
  • 17
  • 1
    Calling a dataset inside a function / UDF / map / group of that dataset is not supported. Is what you are trying to achieve a simple pivot ? https://stackoverflow.com/questions/30244910/how-to-pivot-spark-dataframe – GPI Aug 08 '19 at 09:18

3 Answers3

2

For this use-case, you could use pivot:

ds
 .withColumn("pivot_column", $"first_column")
 .groupBy($"first_column")
 .pivot("pivot_column")
 .count

If you want better performance, you may want to provide the possible values in pivot like pivot("pivot_column", Seq("a", "b", "c"))

I used count for aggregation but you can do any aggregation you want.

From
+------------+
|first_column| 
+------------+
| a          | 
| b          |
| c          |
+------------+

To

+------------+---+---+---+
|first_column| a | b | c |
+------------+---+---+---+
| a          | 1 | 0 | 0 |
| b          | 0 | 1 | 0 |
| c          | 0 | 0 | 1 |
+------------+---+---+---+
Rafaël
  • 977
  • 8
  • 17
0

If the values of Column are minimal /less, Please you can try the below code.

df.show
+------+
|Column|
+------+
|     A|
|     B|
|     C|
+------+

// If you have multiple columns are exist, select only required column
val names = df.select($"Column").as[String].collect 
val df1 = names.foldLeft(df)((df,n) => df.withColumn(n, lit(0)))
df1.show()
+------+---+---+---+
|Column|  A|  B|  C|
+------+---+---+---+
|     A|  0|  0|  0|
|     B|  0|  0|  0|
|     C|  0|  0|  0|
+------+---+---+---+
Ravi
  • 424
  • 3
  • 13
  • I think the collect function is the problem because since I have a lot of data the collectAsList() function need around one hour. But I will try that out. Could you maybe provide me a java code implementation instead of in scala? Thank you very much! – Caroline Me Aug 08 '19 at 09:19
0

I think the nature of Spark (more precisely, its parallelism) disallow you to achieve your goal using UDF.

When executing your query, Spark distributes your data to executors, each getting its own chunk of rows. Each chunk of rows has its own list of possible values of the column Column. Hense, each executor would try to add its own list of columns, that is different from what other executors do. So, when the driver tries to combine the result sets from diffferent executors, it would fail (or maybe the executors would fail themselves).

collectAsList does solve your problem, though quite inefficient.

Also, you could guess the number of columns and invent some function (suiting your actual data) to map the values that the Column column gets to those numbers - that way you would be able to keep the column sets equal on every executor. This solution is not very general, but it would solve some cases. I.e., you would get columns like these: <c01, c02, c03, ..., cNN>.

Matvey Zhuravel
  • 154
  • 2
  • 13