1

This question is a "sequel" to a previous one. I am new to spark graphx and scala and I was wondering how I can perform the operation below.

How can I merge two graphs into a new graph so that the new graph has the following property:

The attributes of the common edges of the two graphs are averaged (or in a more general way, apply an averaging function between the edge attributes (edge attributes are of type double))

We consider that common edge = same srcId and same dstId, and vertices and edges are unique.

eliasah
  • 39,588
  • 11
  • 124
  • 154
Al Jenssen
  • 655
  • 3
  • 9
  • 25

1 Answers1

2

Assuming you have only two graphs and both contain the same set of vertices without duplicate edges you can use combine edges and use groupEdges method on a new graph:

val graph1: Graph[T,Double] = ???
val graph2: Graph[T,Double] = ???

Graph(graph1.vertices, graph1.edges.union(graph2.edges))
  .groupEdges((val1, val2) => (val1 + val2) / 2.0)

or a little bit more universal:

Graph(graph1.vertices, graph1.edges.union(graph2.edges))
  .mapEdges(e => (e.attr, 1.0))
  .groupEdges((val1, val2) => (val1._1 + val2._1, val1._2 + val2._2))
  .mapEdges(e => e.attr._1 / e.attr._2)

If that is not enough you can combine values and create a new graph from scratch:

def edgeToPair (e: Edge[Double]) = ((e.srcId, e.dstId), e.attr)
val pairs1 = graph1.edges.map(edgeToPair)
val pairs2 = graph2.edges.map(edgeToPair)

// Combine edges
val newEdges = pairs1.union(pairs2)
  .aggregateByKey((0.0, 0.0))(
    (acc, e) => (acc._1 + e, acc._2 + 1.0),
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  ).map{case ((srcId, dstId), (acc, count)) => Edge(srcId, dstId, acc / count)}

// Combine vertices assuming there are no conflicts
// like different labels
val newVertices = graph1.vertices.union(graph2.vertices).distinct

// Create new graph
val newGraph = Graph(newVertices, newEdges)

where aggregateByKey can be replaced by groupByKey followed by mapping that requires all values at once like median.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • As I much I like this answer. Wouldn't it be more appropriate to consider `combineByKey` instead for performance concerns. What do you think? – eliasah Aug 28 '15 at 19:39
  • Instead of `aggregateByKey`? – zero323 Aug 28 '15 at 19:43
  • No instead of `groupByKey` – eliasah Aug 28 '15 at 19:50
  • 2
    To compute median? I don't think so. One way or another we have to see all the values at once. If number of unique levels is low then aggregating counts could reduce the traffic but it is unlikely to be helpful otherwise. – zero323 Aug 28 '15 at 20:13
  • Dude, you are awesome, that works, that is only if I extend Serializable to the class I do this operation, do you know why that is? Is there a method to get the uncommon edges? Thanks a lot!!!! – Al Jenssen Aug 29 '15 at 09:13
  • 1
    You should take a look at [this answer](http://stackoverflow.com/a/22596875/1560062) by [Grega Kešpret](http://stackoverflow.com/users/587408/grega-ke%c5%a1pret). _Is there a method to get the uncommon edges?_ - I don't see any reason why not. Depending on requirements (like labels handling) you can take a different approach though. – zero323 Aug 29 '15 at 14:04
  • The link you provided really helped. As for the uncommon edges question, well, spark has enough methods that help with RDDs handling and transformations, it just takes some training to get used to apache spark and scala, if you are new to both. Thank you a lot for your answers and your quick responses!!!! – Al Jenssen Aug 29 '15 at 14:15