0

I'm using Graphx on Spark for some experiment, and the current step is to get a subgraph of a generated graph. I've checked the the original graph has been generated successfully, not only the lazy lineage goes well but when I try graph.vertices.first() the result is correctly displayed. Now my subgraph code is:

val reg = "(\\d*)11".r
val graphUSA_subgraph =  graphUSA.subgraph(
  vpred = (id, user) =>{
    (id.toString() match{
      case reg(x) => true
      case _ => false
    }) 
  }
)
graphUSA_subgraph.vertices.first()

I meant to get a subgraph only contain nodes whose index ends with "11". I've check the Boolean block in vpred = (id, user) => Boolean and the logic is correct. What confuses me is when I ran the code in spark shell it raised an Error, and log is as follows:

Exception in task * in stage *...
java.io.InvalidClassException:...
unable to create instance
at java.io.ObjectInputStream. ...
...
Caused by: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM ... The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:123)

The error is not caused by Graph.subgraph() itself, because when I ran a simpler version:

val graph_subgraph_1 = graph.subgraph{
   vpred = (id, user) => id.toString.endsWith("00")
}
graph_subgraph_1.vertices.first()

Everything went fine.

And then I tried another version which doesn't refer to the reg outside Graph class:

val graphUSA_subgraph_0 =  graphUSA.subgraph(
  vpred = (id, user) =>{
    id.toString().drop(id.toString().length() -2) match{
      case "11" => true
      case _ => false
    }
  }
)
graphUSA_subgraph_0.vertices.first()

Everything went fine too.

I'm wondering in which step a new SparkContext is implicitly generated in the pipeline. And it seems quite possible that referring to some val(regs) outside function has caused it.

I've been struggling on this block for quite some time, and would be grateful if anyone could shed some light on it. Thanks in advance!

Mon.Ma
  • 1
  • 2
  • Why are you stopping and starting a new `SparkContext`? Also, why does using `id` in `map` have anything to do with `SparkContext` being Serializable? – Yuval Itzchakov May 21 '16 at 12:56
  • @YuvalItzchakov Because there's only one SparkContext allowed and along with the shell there's a default opening one, and I need it to be Serializable. As for Serializable plz check this question [link] (http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou) . But I just found that `map` is not relevant to this question. I've updated the question just now plz check and offer opinions, thx! – Mon.Ma May 21 '16 at 13:49
  • 2
    Why aren't you using the `SparkContext` provided to you? If you need to serialize your context, you're must likely doing something wrong – Yuval Itzchakov May 21 '16 at 13:52
  • @YuvalItzchakov well previously when I needed to refer `id` in a `map` without serializing `sc` a ERROR was raised... I don't have any class structure and everything's straight referring to `sc`. Tuning on it is a little beyond my scope right now :/ – Mon.Ma May 21 '16 at 14:05
  • 1
    What type is `id`? Where is it coming from? – Yuval Itzchakov May 21 '16 at 15:10
  • @YuvalItzchakov id is the vertexId in vertexRDD, the signature of Graph.subgraph is `def subgraph( epred: EdgeTriplet[VD, ED] => Boolean = x => true, vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED]` – Mon.Ma May 21 '16 at 15:51

0 Answers0