0

I have a dataframe as follows:

+---------------+--------------------+
|IndexedArtistID|     recommendations|
+---------------+--------------------+
|           1580|[[919, 0.00249262...|
|           4900|[[41749, 7.143963...|
|           5300|[[0, 2.0147272E-4...|
|           6620|[[208780, 9.81092...|
+---------------+--------------------+

I want to split the recommendations column so as to have a dataframe as follows:

+---------------+--------------------+
|IndexedArtistID|     recommendations|
+---------------+--------------------+
|           1580|919                 |
|           1580|0.00249262          |
|           4900|41749               |
|           4900|7.143963            |
|           5300|0                   |
|           5300|2.0147272E-4        |
|           6620|208780              |
|           6620|9.81092             |
+---------------+--------------------+

So basically, I want to split the feature vector into columns and then merge those columns into a single column. The merging part is described in : How to split single row into multiple rows in Spark DataFrame using Java. Now, how to carry out the splitting part using java? For scala, it is explained here: Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)], but I am not able to find a way to proceed the same way in java as given in the link.

The schema for the dataframe is below and the value of IndexedUserID is to be taken into the newly created recommendations column:

root
 |-- IndexedArtistID: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- IndexedUserID: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
udit
  • 101
  • 3
  • 17
  • I have provided scala code without using explode function. You can write the splitting part in Java similar to my example. I am not using Java much so thought this will be helpful. Please see if it works for you. – Salim Jan 15 '20 at 20:08

1 Answers1

0

I tried finding a solution to the question and I must say that there is a lot of content available for problems faced by people in python and scala for spark but very little content is available in java. So, the solution would go as:

List<ElementStruct> structElements = dataFrameWithFeatures.javaRDD().map(row -> {
        int artistId = row.getInt(0);
        List<Object> recommendations = row.getList(1);
        return new ElementStruct(artistId, recommendations);
    }).collect();

    List<Recommendation> recommendations = new ArrayList<>();
    for (ElementStruct element : structElements) {
        List<Object> features = element.getFeatures();
        int artistId = element.getArtistId();
        for (int i = 0; i < features.size(); i++) {
            Object o = ((GenericRowWithSchema) features.get(i)).get(0);
            recommendations.add(new Recommendation(artistId, (int) o));
        }
    }
    SparkSession sparkSession = SessionCreator.getOrCreateSparkSession();
    Dataset<Row> decomposedDataframe = sparkSession.createDataFrame(recommendations, Recommendation.class);

ElementStruct Class

import java.io.Serializable;
import java.util.List;

public class ElementStruct implements Serializable {
    private int artistId;
    private List<Object> features;

    public ElementStruct(int artistId, List<Object> features) {
        this.artistId = artistId;
        this.features = features;
    }

    public int getArtistId() {
        return artistId;
    }

    public void setArtistId(int artistId) {
        this.artistId = artistId;
    }

    public List<Object> getFeatures() {
        return features;
    }

    public void setFeatures(List<Object> features) {
        this.features = features;
    }
}

Recommendation Class

import java.io.Serializable;

public class Recommendation implements Serializable {
    private int artistId;
    private int userId;

    public Recommendation(int artistId, int userId){
        this.artistId = artistId;
        this.userId = userId;
    }

    public int getArtistId() {
        return artistId;
    }

    public void setArtistId(int artistId) {
        this.artistId = artistId;
    }

    public int getUserId() {
        return userId;
    }

    public void setUserId(int userId) {
        this.userId = userId;
    }
}

Explanation: 1. For each row in dataframe, get the artist and feature as a list to make it easier for further processing. Store those artist and list of features as a java object (Element struct in this case).

  1. for each artist and element in the list of features, create a new list of objects (Recommendation in this case) and store each of them in that list.

  2. Finally, create a dataframe out of that list of objects obtained in 2nd step.

Result:

root
 |-- artistId: integer (nullable = false)
 |-- userId: integer (nullable = false)

+---------------+----------------+
|       artistId|          userId|
+---------------+----------------+
|           1580|919             |
|           1580|0.00249262      |
|           4900|41749           |
|           4900|7.143963        |
|           5300|0               |
|           5300|2.0147272E-4    |
|           6620|208780          |
|           6620|9.81092         |
+---------------+----------------+
udit
  • 101
  • 3
  • 17