0

Here is my RDD[String]

M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4

M2 module2
PIP a I n4
PIP b O D
PIP c O n5

and so on. Basically, I need a RDD of key (containing the second word on line1) and values of the subsequent PIP lines that can be iterated upon.

I've tried the following

val usgPairRDD = usgRDD.map(x => (x.split("\\n")(0), x))

but this gives me the following output

(,)
(M1 module1,M1 module1)
(PIP a Z A,PIP a Z A)
(PIP b Z B,PIP b Z B)
(PIP c Y n4,PIP c Y n4)
(,)
(M2 module2,M2 module2)
(PIP a I n4,PIP a I n4)
(PIP b O D,PIP b O D)
(PIP c O n5,PIP c O n5)

Instead, I'd like the output to be

module1, (PIP a Z A, PIP b Z B, PIP b Z B)
module2, (PIP a I n4,PIP b O D, PIP c O n5)

What am I doing wrong? I am quite new to Spark APIs. Thanks

Hi @zero323

usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))

yields...

%%%%%%%%%
M1 module1%%%%%%%%%
PIP a Z A%%%%%%%%%
PIP b Z B%%%%%%%%%
PIP c Y n4%%%%%%%%%
%%%%%%%%%
M2 module2%%%%%%%%%
PIP a I n4%%%%%%%%%
PIP b O D%%%%%%%%%
PIP c O n5%%%%%%%%%

and so on

Hi @zero323 and @Daniel Darabos My input is very very large set of many many files (spanning in TBs). Here is sample..

BIN n4
BIN n5
BIN D
BIN E
PIT A I A
PIT B I B 
PIT C I C
PIT D O D
PIT E O E
DEF M1 module1
   PIP a Z A
   PIP b Z B
   PIP c Y n4
DEF M2 module2
   PIP a I n4
   PIP b O D
   PIP c O n5

I need all the BINS, PIT and DEF (including PIP lines below) in 3 different RDDS. Here is how I am doing this currently (from the discussion, I sense usgRDD below is wrongly computed)

val binRDD = levelfileRDD.filter(line => line.contains("BIN"))
val pitRDD = levelfileRDD.filter(line => line.contains("PIT"))
val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))

I need 3 types (at the moment) of RDDs because I need to perform validation later on. For example, "n4" under "DEF M2 module2" can only exist if n4 is a BIN element. From the RDDs, I hope to derive relationships using GraphX APIs (I have obviously not come upto this point). It would be ideal if each usgPairRDD (computed from usgRDD or otherwise) prints the following

module1, (a Z A, b Z B, c Y n4) %%%%%%%
module2, (a I n4, b O D, c O n5) %%%%%%%

I hope I am making sense. Apologies to the Spark Gods, if I am not.

zero323
  • 322,348
  • 103
  • 959
  • 935
user1384205
  • 1,231
  • 3
  • 20
  • 39
  • What is the input here? It looks like you're using `textFile` to read data, right? – zero323 Dec 08 '15 at 13:56
  • The input is a huge file that has been transformed into the above mentioned RDD - usgRDD. I'd like to transform the usgRDD into usgPairRDD (paired RDD) – user1384205 Dec 08 '15 at 14:00
  • Could you add an exact output form `usgRDD.take(9)`? – zero323 Dec 08 '15 at 14:14
  • This is quite hard because as the data is split across multiple machines it may be that one machine has the `M` line and another has the corresponding `PIP` lines. Nevertheless, I'm sure @zero323 will have a great answer :). – Daniel Darabos Dec 08 '15 at 14:32
  • If you can change the initial input read, then you can use wholeTextFiles instead, and parse the file in a non-distributed way at first. – Justin Pihony Dec 08 '15 at 18:04
  • Thanks @Daniel Darabos. I now understand what you are saying. Please see the above edits to my original question on the nature of the input – user1384205 Dec 09 '15 at 15:29

1 Answers1

4

By default Spark creates a single element per line. It means that in your case every record is spread over multiple elements which, as stated by Daniel Darabos in the comments, can be processed by different workers.

Since it looks like your data is relatively regular and separated by an empty line you should be able to use newAPIHadoopFile with custom delimiter:

import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val path: String = ???

val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
conf.set("textinputformat.record.delimiter", "\n\n")

val usgRDD = sc.newAPIHadoopFile(
    path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
  .map{ case (_, v) => v.toString }

val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match {
  case Array(x, xs @ _*) => (x, xs)
})

In Spark 2.4 or later data loading part can be also achieved using Dataset API:

val ds: Dataset[String] = spark.read.option("lineSep", "\n\n").text(path)
user10938362
  • 3,991
  • 2
  • 12
  • 29
zero323
  • 322,348
  • 103
  • 959
  • 935