Friday, July 15, 2011

Multi-threaded Parallel Processing

Many times you come across a situation when you have to execute the same logic across different clients or batch. While running them in simplest manner without multi-threading, your records will be sequentially processed means your process will take longer time to finish. 

You can speed up the above process by implementing multi-threaded parallel processing in few lines of code. To keep it simple in this example, you have to make sure that you have distinct batches or set of records which will not interfere or overlap during this processing. Here is how you can implement in quickest possible manner, make sure your machine has enough processing and threading capabilities.

Step 1

Create a property of method so that you can dynamically set the no of threads if needed

Private ReadOnly Property MaxAllowableThreadCount() As Integer
        Get
            'You can define this value in config file 
            Dim maxThreadCount As Integer = 15
            Return maxThreadCount
        End Get
    End Property

Step 2

Following is the example of multi threading or parallel processing code. Assuming each client will have unique set of records.

Private Function ProcessAllClients() As Integer
      Dim clientList As New List(Of String)
      Dim exceptionMessages As New List(Of String)
      Dim totalNoOfRecProcessed As Integer = 0
 
      'Add some Test Clients
      clientList.Add("A1")
      clientList.Add("A2")
      clientList.Add("A3")
 
 
      If clientList IsNot Nothing AndAlso clientList.Count > 0 Then
          'Customize No of Threads you may want to use as per need or you can simply use a value
          Dim noOfThreadCounts As Integer = MaxAllowableThreadCount
          Dim noOfClients As Integer = clientList.Count
          Dim noOfThreadsGroup As Integer = 0
 
          'Adjust No of Loops as per max allowable thread
          If (noOfClients Mod noOfThreadCounts = 0) Then
              noOfThreadsGroup = CInt(Math.Floor(noOfClients / noOfThreadCounts))
          Else
              noOfThreadsGroup = CInt(Math.Floor(noOfClients / noOfThreadCounts)) + 1
          End If
 
          'Set clients in each group/loop 
          For threadGroup As Integer = 0 To noOfThreadsGroup - 1
              Dim processingTaskCounter As Integer = noOfThreadCounts
 
              If threadGroup = (noOfThreadsGroup - 1) AndAlso (noOfClients Mod noOfThreadCounts > 0) Then
                  processingTaskCounter = noOfClients Mod noOfThreadCounts
              End If
 
              Dim allProcessingTasks(processingTaskCounter - 1) As Task(Of Integer)
              Dim clinetsToProcess As New List(Of String)
              Dim taskCounter As Integer = 0
 
              For i = 0 To processingTaskCounter - 1
                  clinetsToProcess.Add(clientList(CInt(threadGroup * noOfThreadCounts) + i))
              Next
 
              For Each clientCode As String In clinetsToProcess
                  Try
                      Dim currentClient As String = clientCode
                      allProcessingTasks(taskCounter) = New Task(Of Integer)(Function()
                                                                                 Dim pHelper As New ProcessHelper
                                                                                 Return pHelper.ClientProcessingLogic(currentClient)
                                                                             End Function)
                      taskCounter = taskCounter + 1
 
                  Catch ex As Exception
                      exceptionMessages.Add("Sorry! Error Occurred." & ex.Message)
                      totalNoOfRecProcessed = 0
                  End Try
              Next
 
              'Now Execute all client's tasks as once but in seperate thread
              Array.ForEach(allProcessingTasks, Sub(tx) tx.Start())
 
              'Wait till all finishes processing
              Task.WaitAll(allProcessingTasks)
 
              For Each tsk In allProcessingTasks
                  Select Case True
                      Case tsk.IsCanceled
                          exceptionMessages.Add("Task ID" & tsk.Id & " was canceled.")
                      Case tsk.IsFaulted
                          exceptionMessages.Add("Task ID" & tsk.Id & " was faulted.")
                      Case tsk.IsCompleted
                          totalNoOfRecProcessed += tsk.Result
                      Case Else
                          exceptionMessages.Add("Task ID" & tsk.Id & " was in unknown status.")
                  End Select
              Next
          Next
      End If
      Return totalNoOfRecProcessed
  End Function

Step 3

Call the Step 2 from wherever you want to call it

Dim allItemsProcessedCount As Integer = ProcessAllClients()

Now you can see the difference in processing time and no of records processed during one call.