Implementing parallel breadth-first search using .NET 4.0 TPL
Introduction
In the last days I worked on a small project for the parallel computing course at my university. The task was to take any serial algorithm we like, try to parallelize it and write an article about our results.
For the reason of my graph addiction, I chose the default breadth-first search (BFS) in the version of a single source shortest path (SSSP) search. But first some explanations on the topic. A graph G(V,E) is a structure which consists of a set of vertices (nodes) V and a set of edges E. Each edge connects a pair (u,v) of vertices. A graph can be either directed or undirected.
In a directed graph (figure 1) each edge has a source and a target vertex, a connection between vertex A and vertex B does not assume a connection between vertex B and vertex A.
In an undirected graph (figure 2) an edge implies a connection in both ways. The number of edges, which are connected to a vertex u, is called degree of u. Another property of graphs is the weight of vertices or edges. In a weighted graph each vertex or edge might have a weight to define the costs to traverse this entity. In a real network, an edge-weight could represent the distance between cities or the importance of social relationships („friend of“ > „knows“).
The graph structure is an abstract way to model relationships in real networks like social, technological or biological networks. But in fact, graph theory is a part of discrete mathematics and offers algorithms to do a lot of measurement on networks.
A common example for real networks in the current version of the web are social networks like Facebook, LinkedIn or the VZ-Network in the german region. Interesting questions concerning this kind of networks are:
„How many users are between User A and User B?“
„How important is User A in this part of the network?“
„How many friends of User A also know each other?“
„How many shortest path connections between two users contain User A?“
Some of these questions can be answered by algorithms which are an extension of the BFS. Based on this algorithm, we can measure the shortest path between two vertices [1], calculate the betweenesses centrality [2] or check how many vertices are reachable from Vertex A and their distance to A [3].
The latter example is called a breadth-first tree which is in fact a spanning tree in a connected subgraph rooted at some vertex s. The breadth-first tree contains some extra information at the vertices: the distance to root vertex s (source) and a reference to its predecessor. Based on the second information you can reconstruct any shortest path after building the tree. The following figure shows an example of a breadth-first tree in a directed graph.
But why is the algorithm called “breadth-first”? To answer this question we should look a bit more in detail how it works…
The algorithm
Starting at a given root vertex s, the algorithm looks up all neighbors of s. In a directed graph, these are all neighbors at outgoing edges (edges which have s as source vertex), in an undirected graph the whole set of neighbors is affected. All these vertices own the same distance to s, because they’re just one step away. In the next iteration, all these “one-step-away”-vertices are treated like they were source nodes. So the algorithm checks all nodes with the same distance k to s before it starts checking all nodes with distance k + 1 to s.
To realize this behavior, the algorithm uses a datastructure called “queue”. A queue is a first-in-first-out datastructure where elements are added at the end (enqueue) and removed at front (dequeue). Using this structure guarantees that all nodes with same distance are processed before switching to the next level of the breadth-first tree. The following listing shows the algorithm:
public static void Search(IGraph myGraph, IVertex mySource) { #region Init InitGraph(myGraph); // init source vertex mySource.IsVisited = true; mySource.Distance = 0; // use Concurrent Queue for parallel access var queue = new Queue(); IVertex u = null; #endregion #region BFS // enqueue the source vertex queue.Enqueue(mySource); while (queue.Count > 0) { // take front element of the queue u = queue.Dequeue(); // process neighbours of u foreach (var outEdge in u.OutgoingEdges) { // neighbour node var v = outEdge.Target; if (!v.IsVisited) // unexplorered node { // set as visited v.IsVisited = true; // set the predecessor v.Predecessor = u; // increment the distance v.Distance = u.Distance + 1; // and enqueue that node queue.Enqueue(v); } } } #endregion }
The idea for parallelization
If you think of social graphs like Facebook, you will recognize, that these graphs are really large-scale. Facebook itself has about 0.5 Billion active users and any user has about 130 relationships to other users [4]. Furthermore, there are about 0.9 Billion objects like groups, pages or events to whom a user can be connected to. And realize how many times you clicked the “Like” button… ;)
Such large-scale graphs can’t be computed by a single processor or a single machine (maybe by a mainframe? :). So we are forced to find solutions to distribute the work to multiple processors or multiple machines in a cluster. Due to the fact, that I don’t own a cluster of more than 1 PC, I decided to distribute the work on the cores of a single CPU (more on that later).
If you think again about Facebook with an average degree of 130 friends, it’s easy to see that the more far away we are from s, the more nodes we have to check. In the first iteration there are just 130 nodes, in the second level each of these 130 nodes also has about 130 neighbors (assuming that there are less ring-references). So for a given distance d from s we have about 130^d nodes with the same distance to s.
My idea was to distribute these 130^d nodes on the available processors, so that each level can be checked in parallel.
The implementation
I decided to try two parallel programming approaches which I learned in the .NET Parallel Book [5]: the “while-not-empty” pattern using two Concurrent Queues and the “parallel aggregate” pattern which is doing some thread-local calculation and merges the results of all threads.
while-not-empty
The ConcurrentQueue is a generic threadsafe FIFO-datastructure. Thread A can enqueue new items while another Thread B dequeues the items at the beginning. Another important fact – that might sound weired – is that you can modify the queue while iterating over its elements. This is made possible by creating a snapshot of the current queue-state and iterating over the snapshot. The algorithm is displayed in the following listing.
There are two Concurrent Queues, the read queue, which is used for parallel reading, and the write queue, which is used for parallel writing. I tried the above mentioned pattern using just one queue for reading and writing and there were no differences in performance, so I decided to use this way because of the “not-weired”-bonus.
public static void Search(IGraph myGraph, IVertex mySource) { #region Init InitGraph(myGraph); // init source vertex mySource.IsVisited = true; mySource.Distance = 0; // first queue is the read queue var from = new ConcurrentQueue(); // second queue is the write queue var to = new ConcurrentQueue(); #endregion #region BFS // enqueue the source vertex from.Enqueue(mySource); while (from.Count > 0) { // re-initialize the write queue to = new ConcurrentQueue(); // process current BFS tree level in parallel Parallel.ForEach(from, u =>; { if (from.TryDequeue(out u)) { // process neighbours of u foreach (var outEdge in u.OutgoingEdges) { // neighbour node var v = outEdge.Target; if (!v.IsVisited) // not the target { // set as visited v.IsVisited = true; // set the predecessor v.Predecessor = u; // increment the distance v.Distance = u.Distance + 1; // add to the queue to.Enqueue(v); } } } }); // switch queues from = to; } #endregion }
At first the source vertex s is enqueued into the read queue, it represents level 0 of the breadth-first tree. The read queue is then processed in parallel to retrieve all neighbors of the currently enqueued nodes. These neighbor-nodes are enqueued in the write queue. After processing the whole level (all nodes with same distance to s), the write queue becomes the new read queue and the write queue is re-initialized. All nodes with current distance + 1 are now enqueued in the read set and can be processed in parallel during the next iteration. This process runs until all reachable nodes are processed.
parallel aggregate
Another really interesting pattern which is introduced in [5] is the parallel aggregate pattern. This approach can be used if there is no “natural” data parallelism because there is just one final result which is accumulated using intermediate results. A typical example is the sum calculation: if you have n summands, you can distribute them to p processors which calculate intermediate (partial) results. If all partial results are calculated, they will be merged into one final result. It sounds like a kind of map reduce, and in my opinion this is a very similar pattern.
In figure 4 you can see an input set distributed to three tasks. Each task calculates an intermediate result (subtotal) which is merged with the final result (total). The calculations of intermediate results are independent, so this can run in parallel. The important thing is, that the merge between intermediate and final result has to be processed serially.
Like the while-not-empty pattern, the following implementation also uses two datastructures for read and write. In this case there is no need for concurrent queues, because write operations are serialized and the structure won’t change during iteration. I used an overloaded version of the Parallel.ForEach method. The first parameter is the datastructure to read from in parallel, in our case these are all vertices with same distance to the root vertex. The second parameter defines an init action or pre-processor, which is performed before the loop body is executed. In this implementation we use the parameter to initialize the local (partial) result. I used an ordinary generic List, because there is no need for thread-safety. The third parameter is the loop body itself, defined by the vertex u, the loopstate, which is unused and finally the partial result to store the neighbors of u. Parameter four also describes an action, which I name post-processor because it is called after executing the loop body. In this action the thread-local partial result is merged into the final result (all nodes with distance k + 1). This step has to be serialized using the lock object.
public static void Search(IGraph myGraph, IVertex mySource) { #region Init InitGraph(myGraph); // init source vertex mySource.IsVisited = true; mySource.Distance = 0; // use simple lists for r/w var from = new List<IVertex>(); var to = new List<IVertex>(); // need this for the merging part var lockObj = new Object(); #endregion #region BFS // add the source vertex from.Add(mySource); while (from.Count > 0) { to = new List<IVertex>(); Parallel.ForEach( // the values to be aggregated from, // local initial partial result () => new List<IVertex>(), // loop body (u, loopState, partialResult) => { foreach (var outEdge in u.OutgoingEdges) { // neighbour node var v = outEdge.Target; if (!v.IsVisited) // not the target { // set as visited v.IsVisited = true; // set the predecessor v.Predecessor = u; // increment the distance v.Distance = u.Distance + 1; // add this vertex to the partial result partialResult.Add(v); } } return partialResult; }, // the final step of each local context (localPartialSet) => { // this has the be done in serial way lock (lockObj) { to.AddRange(localPartialSet); } }); // switch lists from = to; } #endregion }
evaluation
The implementations where tested on nine example graphs. Five of these are real graphs taken from [6]. As a representative of social networks, I chose the slashdot community graph of 2009 (slash_soc). For the category of information networks, there is a webgraph from google (google_web), which was published in 2002 for a programming competition. As a communication network, I used a forum chat from wikipedia (wiki_talk). The road network of Texas (texas_road) is the representative for transportation networks and the last real network is a snapshot of the gnutella p2p network made in 2002 (gnut_p2p).
The additional four graphs where generated using different graph models. There are two random graphs based on the Erdos-Renyi-Model [8] and two scale-free graphs based on the model of Barabasi and Albert [9]. If you want me to explain these models, just leave me a comment. They were generated using igraph, a graph library for R.
Except the texas_road network, all graphs are directed. The real networks are unconnected, except the gnut_p2p.
The test platform is an Intel Core 2 Quad CPU (4×2.83Ghz) with 16GB RAM and Windows 7 x64 as OS. This machine is one of the test and profiling machines at sones. All measurements are performed in-memory.
For each graph, the benchmark chose 10 random source vertices and built the bfs-tree 20 times. The average duration was determined and from these average values, the average duration for each algorithm was composed. The unit for all time values is [ms]. The speedup S_p is the relation between serial and parallel execution based on p processors. So we look at S_4.
The results are displayed in table 1, d(G) is the diameter of the graph, t_serial denotes the duration of the serial algorithm, t_wne of the while-not-empty implementation and t_la of the local aggregate implementation.
In general, the parallel execution needs less time. The average speedup of the while-not-empty implementation is S_4 = 1.45, the speedup of the local aggregation is S_4 = 2.17. The maximum value S_4 = 2.65 was achieved by the local aggregation on the random graph erdos_1. The while-not-empty implementation made the worst result S_4 = 0.72 in the wiki_talk network.
| Graph | |V| | |E| | d(G) | t_serial | t_wne (S_4) | t_la (S_4) |
|---|---|---|---|---|---|---|
| slash_soc | 77360 | 905468 | 12 | 157 | 94 (1.67) | 72 (2.18) |
| google_web | 875713 | 875713 | 22 | 727 | 557 (1.31) | 356 (2.04) |
| wiki_talk | 2394385 | 5021410 | 9 | 1747 | 2443 (0.72) | 853 (2.09) |
| texas_road | 1379917 | 3843320 | 1049 | 989 | 872 (1.13) | 535 (1.84) |
| gnut_p2p | 62586 | 147892 | 11 | 44 | 41 (1.07) | 21 (2.1) |
| erdos_1 | 1000000 | 8332480 | - | 3161 | 1948 (1.62) | 1225 (2.58) |
| erdos_2 | 100000 | 3993968 | - | 1063 | 435 (2.44) | 401 (2.65) |
| barabasi_1 | 1000000 | 3999964 | - | 1787 | 1137 (1.57) | 943 (1.89) |
| barabasi_2 | 100000 | 399970 | - | 132 | 88 (1.5) | 60 (2.2) |
conclusion
Based on a maximum speedup of S_4 = 4, the average speedup of the while-not-empty implementation S_4 = 1.45 is not according to the expectations. One reason for this bad result is the parallel read and write in both concurrent queues. These processes require internal locking, which affects the duration in a negative way. I validated this assumption by comparing the serial and the concurrent queue using one thread.
The local aggregation on the other hand achieved an obvious improvement with an average speedup of S_4 = 2.17. It’s on the dice that the serial execution of the merge from local result to final result leads into performance losses.
In addition to the mentioned reasons, I could determine using the Visual Studio 2010 Concurrency Profiler, that the garbage collector blocked running tasks because of allocating or freeing memory. This also has negative effects to the speedup.
future work
I currently tried the BFS in a multicore environment, but it’s even more interesting to see how the algorithm performs in a cluster. I found a paper where the BFS was implemented on a BlueGene/L with 32.768 CPUs [7]. Maybe I can use my EC2 voucher to play a little with clusters :)
Another thing I’d like to test is to do local vertex processing. Currently each thread just looks up the neighbors for each vertex. One consideration is to implement something like matching or filter functions which are performed at each vertex. Two examples: “Give me the shortest path between Berlin and Munich but don’t pass Leipzig or Nuremberg” or “Give me the shortest path between Vertex A and Vertex B containing only vertices with a local cluster coefficient of 0.5 or greater”.
finally…
This is my first more extensive blog article, so thank you for reading and I hope to get some feedback on my work :)
If you got any ideas for improving my implementations or you might have a totally different approach, then let’s discuss it.
[1] E. W. Dijkstra. A note on two problems in connexion with graphs. Numerische Mathematik, 1(1):269-271, 1959.
[2] M. E. J. Newman. Networks An Introduction. Oxford University Press, 2010.
[3] R. C. Prim: Shortest connection networks and some generalizations. In: Bell System Technical Journal, 36 (1957), pp. 1389–1401
[4] Facebook Statistics (Accessed 2011-02-18)
[5] Parallel Programming with Microsoft .NET at CodePlex (Accessed 2011-02-18)
[6] Stanford Large Network Dataset Collection (Accessed 2011-02-18)
[7] A. Yoo, E. Chow, K. Henderson, W. McLendon, B. Hendrickson, and U. Catalyurek. A scalable distributed parallel breadth-firrst search algorithm on bluegene/L. Washington, DC, USA, 2005. IEEE Computer Society.
[8] Erdos, P. und A. Rényi: On random graphs. I. Publ. Math. Debrecen, 6:290-297, 1959.
[9] Barabási, Albert-László und Eric Bonabeau: Skalenfreie Netze. Spektrum der Wissenschaft, Juli:64-68, 2004.




Hi Martin aka s1ckboy :-)
thank you for this great article. I just want to spread my thoughts:-)
Is there a reason, why you dequeue the items from the “from” queue, in the while-not-empty pattern?
The Parallel.Foreach implementation itself iterates over the results and removing items from the queue only needs locking. Maybe this is a reason, why the results of the while-not-empty pattern are below your expectations.
A further step would be to make the “from” queue a simple List. But to this you need to copy the results in the last assignment. This could be another performance gap.
Finally I thought about only unsing one concurrent queue. I do not know, if the Parallel.Foreach can handle growing queues. What I know is, that the initialization of Parallel.Foreach or any Parallel Task needs time. So maybe a pattern is needed, that initializes a number of tasks (equals the number of cores). Each of them is reading from the start of the queue and writes to its end. The task must not stop if there is no element available but must stop if no one of the tasks can read an element anymore.
Fair enough, maybe it helps. I am looking forward to read another interesting article here.
Regards,
Timo
@Timo
thx for your comment!
You’re totally right, the TryDequeue() call is unnecessary. This could cause the performance loss. I’ll modify the source and run the benchmark again.
I also tried using only one ConcurrentQueue, but there were no differences in performance, so I chose the described implementation. But maybe this behaviour also changes if I remove the TryDequeue() call.
Thx again for your comment and your advices ;)
Hi Martin..
thanks you for hard work, but I just want to give some comments on yours tests ;) :
- You make heavy use of scalefree networks, therefore each calculation of a mean might lead to wrong interpretations, especially if you don’t include the variances.
- Using only 20 runs per calculation might also lead to wrong interpretations, as this number is to small for statistical proofs. Most publications use 30 runs, as the real maths guys told them so ;)
- Using only 10 starting vertices per graph is a _much_ too small number of random examinations especially as your primary proposition when using multiple graphs is that the parallel speedup may be a result of the topology of a graph.
- There might be effects called “transient recovery time” (ordinary people might call this problem “cold caches” ;) ) at the start of your measurements which might also lead to wrong interpretations. So it might be necessary to find ways to predict when these phases are over and not to include these phases into your calculations.
- I think you should check all direct and hidden lock() calls, as they might have a significant influence of the achievable parallel speedup especially on smaller networks.
Cheers…
Achim
Hey Achim,
thx for your comment on my test execution. In the original paper I included the variances, but I thought that for a blog article it wouldn’t be that interesting.
But thinking about your comment, I could explain to myself why the variances for scale-free graphs are greater than for graphs with no power law distribution..it’s just logical :) so thanks for this “hint” :)
After implementing Timo’s advices, I’ll run the benchmark again and I’ll use 30+ iterations and will also add the variances to the results. And there will be much more source nodes ;)
I understand the “cold caches” point, but I don’t use any object caching in my sample graph implementation, so I don’t think that there’s a remarkable performance different. But I’ll consider this point when doing some benchmarks with graphdbs.
greetings,
Martin