5

​Lets say I have a large number of graph files and each graph has around 500K edges. I have been processing these graph files on Apache Spark and I was wondering how to parallelize the entire graph processing job efficiently. Since for now, every graph file is independent with any other, I am looking for parallelism with the files. So, if I have 100 graph files and I have 20 nodes clusters, can I process each file on each node, so each node will process 5 files. Now, what is happening is like the single graph is being processed in number of stages which is causing a lot of shuffling.

graphFile = "/mnt/bucket/edges" #This directory has 100 graph files each file with around 500K edges

nodeFile = "/mnt/bucket/nodes" #This directory has node files

graphData = sc.textFile(graphFile).map(lambda line: line.split(" ")).flatMap(lambda edge: [(int(edge[0]),int(edge[1]))])

graphDataFrame = sqlContext.createDataFrame(graphData, ['src', 'dst']).withColumn("relationship", lit('edges')) # Dataframe created so as to work with Graphframes

nodeData = sc.textFile(nodeFile).map(lambda line: line.split("\s")).flatMap(lambda edge: [(int(edge[0]),)])

nodeDataFrame = sqlContext.createDataFrame(nodeData, ['id'])

graphGraphFrame = GraphFrame(nodeDataFrame, graphDataFrame)

connectedComponent = graphGraphFrame.connectedComponents()

The thing is its taking a lot of time to process even couple of files. And I have to process like 20K files. Each file has 800K edges. May be if data partition strategy can be figured out that ensures every dependent edges will be processed on single node, shuffling will be less.

Or what is the best way of solving this efficiently ?

zero323
  • 322,348
  • 103
  • 959
  • 935
hsuk
  • 6,770
  • 13
  • 50
  • 80
  • 2
    If you want to process data using single machine and single thread what is the point of using Spark? – zero323 Aug 17 '16 at 15:13
  • 1
    I might be wrong. I am just wondering and confused how spark handles the data. I have created separate RDD for every graph file and when I run the job, it just keeps staging for ever. I am running it on 8 nodes cluster. – hsuk Aug 17 '16 at 15:16
  • 2
    The main Spark focus is data parallelism and while the core engine is generic enough to use for some variants of task parallelism it is typically an expensive overkill. If you assume that individual graphs can be processed on a single node just use good local library and parallelize tasks using your favorite scheduling / pipeline management tool. – zero323 Aug 17 '16 at 15:28
  • 1
    Thanks, if I create a single big RDD of all those files, how would I consider partition them correctly ? – hsuk Aug 17 '16 at 15:35
  • You could use custom partitioner and extract file names for example from [`input_file_name`](http://stackoverflow.com/a/36356253/1560062) but I don't think it will help you much with GraphX. – zero323 Aug 17 '16 at 17:34
  • Well, I am using data frames now. But still looking forward how to partition it. Hey, if you provide me your email address, I can send you my code which is roughly like 15 lines and could you please review and tell me what to do roughly on it to optimize it. I can't just post my code here. – hsuk Aug 18 '16 at 16:32
  • Could you please see my updated question... – hsuk Aug 20 '16 at 17:13
  • To be honest I don't have anything more to add here. If you want to process graphs locally then Spark based graph processing libraries (GraphX, Graphframes) will be useless. And even if process everything on a single node it will still require shuffle. – zero323 Aug 20 '16 at 19:17
  • The thing is its taking a lot of time to process even couple of files. And I have to process like 20K files. Each file has 800K edges. May be if data partition strategy can be figured out that ensures every dependent edges will be processed on single node, shuffling will be less. – hsuk Aug 20 '16 at 20:17

1 Answers1

5

TL;DR Apache Spark is not the right tool for the job.

The main scope of Spark is data parallelism but what you're looking for is task parallelism. Theoretically core Spark engine is generic enough to be used to achieve limited task parallelism as well, but in practice there are better tools out there for job like this and it definitely not the goal of the libraries like GraphX and GraphFrames.

Since data distribution is the core assumption behind these libraries their algorithms are implemented using techniques like message passing or joins what is reflected in multistage job structure and shuffles. If data fits in the main memory (you can easily process graphs with millions of edges on single node using optimized graph processing libraries) these techniques are completely useless in practice.

Given the piece of code you've shown, in-core graph processing library like igraph or NetworkX (better documented and much more comprehensive but unfortunately memory hungry and slightly slowish) combined with GNU Parallel should be more than enough and much more efficient in practice. For more complex jobs you may consider using full featured workflow management tool like Airflow or Luigi.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • what do u mean by unfortunately less efficient ? – hsuk Aug 20 '16 at 22:04
  • If you mean Spark is not the right tool for this job, what could be the examples of graph algorithm a spark can handle efficiently. I really didn't get it why Spark cannot be the right tool for this job. – hsuk Aug 21 '16 at 01:20
  • And can u please explain me for what kind of data will be good for the above connected component function to work properly. – hsuk Aug 21 '16 at 14:21
  • _why Spark cannot be the right tool for this job_ - because it is not designed for single machine, low latency jobs. _what could be the examples of graph algorithm a spark can handle efficiently_ - distributed graph processing is hard problem in general. If you have data that can be easily processed using in-core solutions there is no reason to bother with distributed. Leave it for data that actually requires it. – zero323 Aug 21 '16 at 18:50