Home .NET Parallelizing Tasks with Dependencies – An Example on .NET

Parallelizing Tasks with Dependencies – An Example on .NET

by admin

Hello colleagues!
This week we gave away for translation an ambitious book, " Concurrency in .NET " by Manning Publishers:
Parallelizing Tasks with Dependencies - An Example on .NET
The author has kindly posted an excerpt from Chapter 13 on Medium, which we offer to evaluate well in advance of the premiere.
Enjoy reading!
Suppose you need to write a tool that allows you to perform a number of asynchronous tasks, each with a different set of dependencies that affect the order in which operations are executed. Such problems can be solved with sequential and imperative execution, but if you want to maximize performance, sequential operations won’t work for you. Instead, you need to arrange for parallel execution of tasks. Many competitive problems can be treated as static collections of atomic operations with dependencies between their inputs and outputs. Once an operation is complete, its output is used as input for other, dependent operations. To optimize performance, tasks should be assigned based on their dependencies, and the algorithm should be configured so that dependent tasks are executed as sequentially as necessary and as concurrently as possible.
You want to make a reusable component that executes a series of tasks in parallel, and ensure that any dependencies that might affect the order in which the operations are performed are taken into account. How do you build a programming model that provides basic parallelism for a collection of operations that run efficiently, either in parallel or sequentially, depending on what dependencies occur between a given operation and others?
Solution : We implement dependency graph with MailboxProcessorclass from F# and provide methods as standard tasks, so they can be consumed from C# too
Such a solution is called an Oriented Acyclic Graph (DAG) and is designed to form a graph by splitting operations into sequences of atomic tasks with well-defined dependencies. In this case, the acyclic nature of the graph is important because it eliminates the possibility of mutual blocking between tasks, provided that the tasks are in fact completely atomic. When specifying a graph, it is important to understand all dependencies between tasks, especially implicit dependencies that can lead to mutual locks or race conditions. Below is a typical example of a graph-like data structure that can be used to represent the constraints that arise when scheduling interactions between operations in a given graph.
A graph is an exceptionally powerful data structure, and you can write strong algorithms based on it.
Parallelizing Tasks with Dependencies - An Example on .NET
Figure 1 A graph is a collection of nodes connected by edges. In this representation of an oriented graph, node 1 depends on nodes 4 and 5, node 2 depends on node 5, node 3 depends on nodes 5 and 6, etc.
The DAG structure is applicable as a strategy for parallel execution of tasks, taking into account the order of dependencies, which improves performance. The structure of such a graph can be defined using the class MailboxProcessor from the F# language; this class stores the internal state for the tasks registered for execution in the form of edge dependencies.
Validation of an oriented acyclic graph
When working with any graph data structure, such as DAG, we have to take care of proper registration of edges. For example, going back to figure 1: what happens if we have node 2 registered with dependencies on node 5 and node 5 does not exist? It can also happen that some edges depend on each other, causing an oriented loop. If there is an oriented loop, it is critical to do some tasks in parallel; otherwise, some tasks may wait forever for others to complete, and there will be a mutual blockage.
The problem is solved by topological sorting: this means that we can order all vertices of the graph so that any edge leads from a vertex with a smaller number to a vertex with a larger number. So, if task A has to complete before task B, and task B has to complete before task C, which in turn has to complete before task A, a circular reference occurs, and the system will notify you of this error by throwing an exception. If an oriented loop occurs during queue management, there is no solution. This kind of check is called "cycle detection in an oriented graph". If an oriented graph satisfies the described rules, then it is an oriented acyclic graph, excellent for running multiple tasks in parallel with dependencies between them.
The full version of Listing 2, which contains the DAG validation code, is in the source code posted online.
In the following listing the class MailboxProccessor from F# is used as an ideal candidate to implement a DAG that provides parallel execution of operations connected by dependencies. First, let’s define a marked-up union with which we will manage tasks and execute their dependencies.
Listing 1 Message type and data structure for coordinating tasks according to their dependencies

type TaskMessage= // #A| AddTaskof int * TaskInfo| QueueTask of TaskInfo| ExecuteTasksand TaskInfo= // #B{ Context : System.Threading.ExecutionContextEdges : int array; Id : int; Task : Func<Task>EdgesLeft : int option; Start : DateTimeOffset optionEnd : DateTimeOffset option }

#A sends to ParallelTasksDAG base agent dagAgent which is responsible for coordinating the execution of tasks
#B Wraps the details of each task to be performed
Type TaskMessage Represents shell messages sent to a base agent of type ParallelTasksDAG These messages are used to coordinate tasks and synchronize dependencies. Type TaskInfo contains and tracks the details of the registered tasks in the DAG runtime, including dependency edges. The execution context (https://msdn.microsoft.com/en-us/library/system.threading.executioncontext(v=vs.110).aspx) is captured to refer to information during pending execution, such as : current user, any state associated with the logical execution flow, safe code access information, etc. After the event is triggered, the start and end time of the execution is published.
Listing 2 DAG agent in F# for paralleling execution of operations connected by dependencies

type ParallelTasksDAG() =let onTaskCompleted= new Event<TaskInfo> () // #Alet dagAgent= new MailboxProcessor<TaskMessage> (fun inbox ->let rec loop (tasks : Dictionary<int, TaskInfo>) // #B(edges : Dictionary<int, int list>) = async { // #Blet! msg = inbox.Receive() // #Cmatch msg with| ExecuteTasks -> // #Dlet fromTo = new Dictionary<int, int list> ()let ops = new Dictionary<int, TaskInfo> () // #Efor KeyValue(key, value) in tasks do // #Flet operation ={ value with EdgesLeft = Some(value.Edges.Length) }for from in operation.Edges dolet exists, lstDependencies = fromTo.TryGetValue(from)if not <| exists thenfromTo.Add(from, [ operation.Id ])else fromTo.[from] <- (operation.Id :: lstDependencies)ops.Add(key, operation)ops |> Seq.iter (fun kv -> // #Fmatch kv.Value.EdgesLeft with| Some(n) when n = 0 -> inbox.Post(QueueTask(kv.Value))| _ -> ())return! loop ops fromTo| QueueTask(op) -> // #GAsync.Start <| async { // #Glet start = DateTimeOffset.Nowmatch op.Context with // #H| null-> op.Task.Invoke() |> Async.AwaitATsk| ctx -> ExecutionContext.Run(ctx.CreateCopy(), // #I(fun op -> let opCtx = (op :?> TaskInfo)opCtx.Task.Invoke().ConfigureAwait(false)), taskInfo)let end’ = DateTimeOffset.NowonTaskCompleted.Trigger{ op with Start = Some(start)End = Some(end’) } // #Llet exists, deps = edges.TryGetValue(op.Id)if exists deps.Length > 0 thenlet depOps = getDependentOperation deps tasks []edges.Remove(op.Id) |> ignoredepOps |> Seq.iter (fun nestedOp ->inbox.Post(QueueTask(nestedOp))) }return! loop tasks edges| AddTask(id, op) -> tasks.Add(id, op) // #Mreturn! loop tasks edges }loop (new Dictionary<int, TaskInfo> (HashIdentity.Structural))(new Dictionary<int, int list> (HashIdentity.Structural)))[<CLIEventAttribute> ]member this.OnTaskCompleted = onTaskCompleted.Publish // #Lmember this.ExecuteTasks() = dagAgent.Post ExecuteTasks // #Nmember this.AddTask(id, task, [<ParamArray> ] edges : int array) =let data = { Context = ExecutionContext.Capture()Edges = edges; Id = id; Task = taskNumRemainingEdges = None; Start = None; End = None }dagAgent.Post(AddTask(id, data)) // #O

#A Class Instance onTaskCompletedEvent is used to notify of task completion
#B The agent’s internal state for tracking task registers and their dependencies. Collections are modifiable because during execution ParallelTasksDAG state changes, and because they inherit thread-safety, since they are in Agent
#C Asynchronously waiting for execution
#D Message shell, triggering ParallelTasksDAG
#E Collection displayed on a monotonically increasing index with a task to run
#F The process goes through the list of tasks, analyzing dependencies with other tasks to create a topological structure that represents the order in which the tasks are performed
#G Message shell for queuing a task, executing it, and eventually removing that task from the agent state as an active dependency after the task completes
#H If picked up ExecutionContext is equal to null then we start the task in the current context, otherwise we go to #I
#I Run the task in the intercepted ExecutionContext
#L Initiate and publish an event onTaskCompleted to give a notification of task completion. The event contains information about the task
#M Message shell to add a task to run according to its dependencies, if any
#N Starts execution of registered tasks
#O Adding a task, its dependencies and current ExecutionContext To execute a DAG.
Purpose of function AddTask – register a task with arbitrary dependency edges. This function takes a unique ID, the task to be performed, and a set of edges representing the IDs of all other registered tasks that must be performed before this task can be started. If the array is empty, it means there are no dependencies. An instance of MailboxProcessor named dagAgent stores the registered tasks in the current state of "tasks, " which is a dictionary ( tasks : Dictionary<int, TaskInfo> ), which relates each task ID to its details. Moreover, the Agent also stores the state of the dependencies of the edges on each task ID ( edges : Dictionary<int, int list> ). When an agent is notified to proceed, this process verifies that all edge dependencies have been registered and that there are no loops in the graph. This verification step is available in the full implementation of ParallelTasksDAGin the online code. Next I offer an example in C# where I refer to the library that F# to run ParallelTasksDAG (and consuming it). The registered tasks reflect the dependencies shown above in Figure 1.

Func<int, int, Func<Task> > action = (id, delay) => async () => {Console.WriteLine($”Starting operation{id} in Thread Id{Thread.CurrentThread.ManagedThreadId}…”);await Task.Delay(delay);};var dagAsync = new DAG.ParallelTasksDAG();dagAsync.OnTaskCompleted.Subscribe(op =>Console.WriteLine($”Operation {op.Id} completed in Thread Id { Thread.CurrentThread.ManagedThreadId}”));dagAsync.AddTask(1, action(1, 600), 4, 5);dagAsync.AddTask(2, action(2, 200), 5);dagAsync.AddTask(3, action(3, 800), 6, 5);dagAsync.AddTask(4, action(4, 500), 6);dagAsync.AddTask(5, action(5, 450), 7, 8);dagAsync.AddTask(6, action(6, 100), 7);dagAsync.AddTask(7, action(7, 900));dagAsync.AddTask(8, action(8, 700));dagAsync.ExecuteTasks();

The purpose of the auxiliary function is to output a message saying that the task has started, referring to Id of the actual thread to confirm multithreading.On the other hand, the event OnTaskCompleted event is logged to give notification of the completion of each task, with the output to the console ID tasks and Id of the actual thread. This is the output we get when we call the method ExecuteTasks

Starting operation 8 in Thread Id 23…Starting operation 7 in Thread Id 24…Operation 8 Completed in Thread Id 23Operation 7 Completed in Thread Id 24Starting operation 5 in Thread Id 23…Starting operation 6 in Thread Id 25…Operation 6 Completed in Thread Id 25Starting operation 4 in Thread Id 24…Operation 5 Completed in Thread Id 23Starting operation 2 in Thread Id 27…Starting operation 3 in Thread Id 30…Operation 4 Completed in Thread Id 24Starting operation 1 in Thread Id 28…Operation 2 Completed in Thread Id 27Operation 1 Completed in Thread Id 28Operation 3 Completed in Thread Id 30

As you can see, the tasks are executed in parallel in different threads ( ID threads are different), and the order of their dependencies is the same.
In essence, this is how you parallelize tasks that have dependencies. Read more in Concurrency in .NET.

You may also like