2

I've googled it all day long and couldn't find straight answer, so ended up posting a question here.

I have a file containing line-delimited json objects:

{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "103b", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "103b", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}

My goal is to parse this file with Apache Spark in Java. I referenced How to Parsing CSV or JSON File with Apache Spark and so far I could successfully parse each line of json to JavaRDD using Gson.

JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> data = sc.textFile("fileName");
JavaRDD<JsonObject> records = data.map(new Function<String, JsonObject>() {
    public JsonObject call(String line) throws Exception {
        Gson gson = new Gson();
        JsonObject json = gson.fromJson(line, JsonObject.class);
        return json;
    }
});

Where I'm really stuck is I want to deserialize the "rooms" array so that it can fit to my class Event.

public class Event implements Serializable {
    public static final long serialVersionUID = 42L;
    private String deviceId;
    private int timestamp;
    private String room;
    // constructor , getters and setters 
}

In other words, from this line:

{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}

I want to create two Event objects in Spark:

obj1: deviceId = "103b", timestamp = 1436941050, room = "Office"
obj2: deviceId = "103b", timestamp = 1436941050, room = "Foyer"

I did my little search and tried flatMapVlue, but no luck... It threw me an error...

JavaRDD<Event> events = records.flatMapValue(new Function<JsonObject, Iterable<Event>>() {
    public Iterable<Event> call(JsonObject json) throws Exception {
        JsonArray rooms = json.get("rooms").getAsJsonArray();
        List<Event> data = new LinkedList<Event>();
        for (JsonElement room : rooms) {
            data.add(new Event(json.get("device_id").getAsString(), json.get("timestamp").getAsInt(), room.toString()));
        }
        return data;
    }
});

I'm very new to Spark and Map/Reduce. I would be grateful if you can help me out. Thanks in advance!

Community
  • 1
  • 1
gyoho
  • 799
  • 2
  • 9
  • 25

2 Answers2

3

If you load json data into a DataFrame:

DataFrame df = sqlContext.read().json("/path/to/json");

You could easily do this by explode.

df.select(
    df.col("device_id"),
    df.col("timestamp"),
    org.apache.spark.sql.functions.explode(df.col("rooms")).as("room")
);

For input:

{"device_id": "1", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "2", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "3", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}

You will get:

+---------+------+----------+
|device_id|  room| timestamp|
+---------+------+----------+
|        1|Office|1436941050|
|        1| Foyer|1436941050|
|        2|Office|1435677490|
|        2|   Lab|1435677490|
|        3|Office|1436673850|
|        3| Foyer|1436673850|
+---------+------+----------+
Yuan JI
  • 2,927
  • 2
  • 20
  • 29
1
val formatrecord = records.map(fromJson[mapClass](_))

mapClass should be a case class for mapping the object inside the records json.

Alien
  • 15,141
  • 6
  • 37
  • 57