1 #Region "Microsoft.VisualBasic::feec12d4a6c0940b0cee953ace23b1e7, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\BatchTasks.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Module BatchTasks
35     
36     '         Function: (+2 Overloads) BatchTask
37     
38     '         Sub: BatchTask
39     '         Structure __threadHelper
40     
41     '             Function: __task
42     
43     
44     
45     
46     ' /********************************************************************************/
47
48 #End Region
49
50 Imports System.Runtime.CompilerServices
51 Imports System.Threading
52 Imports Microsoft.VisualBasic.Language
53 Imports Microsoft.VisualBasic.Parallel.Linq
54 Imports Microsoft.VisualBasic.Parallel.Tasks
55 Imports Microsoft.VisualBasic.Linq
56 Imports Microsoft.VisualBasic.ComponentModel.DataStructures
57 Imports Microsoft.VisualBasic.CommandLine
58
59 Namespace Parallel.Threads
60
61     ''' <summary>
62     ''' Parallel batch task tool for processor
63     ''' </summary>
64     Public Module BatchTasks
65
66         ''' <summary>
67         ''' 当所需要进行计算的数据量比较大的时候,建议分块使用本函数生成多个进程进行批量计算以获得较好的计算效率
68         ''' </summary>
69         ''' <typeparam name="T"></typeparam>
70         ''' <param name="source"></param>
71         ''' <param name="getCLI"></param>
72         ''' <param name="getExe"></param>
73         ''' <param name="numThreads">-1表示使用系统自动配置的参数,一次性提交所有的计算任务可能会是计算效率变得很低,所以需要使用这个参数来控制计算的线程数量</param>
74         ''' <param name="TimeInterval">默认的任务提交时间间隔是一秒钟提交一个新的计算任务</param>
75         Public Sub BatchTask(Of T)(source As IEnumerable(Of T),
76                                    getCLI As Func(Of T, String),
77                                    getExe As Func(Of String),
78                                    Optional numThreads As Integer = -1,
79                                    Optional TimeInterval As Integer = 1000)
80
81             Dim srcArray As Func(Of Integer)() =
82                 LinqAPI.Exec(Of Func(Of Integer)) <= From x As T In source
83                                                      Let task As IORedirectFile =
84                                                          New IORedirectFile(getExe(), getCLI(x))
85                                                      Let runTask As Func(Of Integer) = AddressOf task.Run
86                                                      Select runTask
87             Call BatchTask(srcArray, numThreads, TimeInterval)
88         End Sub
89
90         ''' <summary>
91         ''' 
92         ''' </summary>
93         ''' <typeparam name="TIn"></typeparam>
94         ''' <typeparam name="T"></typeparam>
95         ''' <param name="source"></param>
96         ''' <param name="getTask"></param>
97         ''' <param name="numThreads">可以在这里手动的控制任务的并发数,这个数值小于或者等于零则表示自动配置线程的数量,如果想要单线程,请将这个参数设置为1</param>
98         ''' <param name="TimeInterval"></param>
99         ''' <returns></returns>
100         <Extension>
101         Public Function BatchTask(Of TIn, T)(source As IEnumerable(Of TIn),
102                                              getTask As Func(Of TIn, T),
103                                              Optional numThreads As Integer = -1,
104                                              Optional TimeInterval As Integer = 1000) As T()
105             Dim taskHelper As New __threadHelper(Of TIn, T) With {
106                 .__invoke = getTask
107             }
108             Return source.Select(AddressOf taskHelper.__task) _
109                 .ToArray _
110                 .BatchTask(numThreads, TimeInterval)
111         End Function
112
113         Private Structure __threadHelper(Of TIn, T)
114
115             Public __invoke As Func(Of TIn, T)
116
117             Public Function __task(obj As TIn) As Func(Of T)
118                 Dim __invoke As Func(Of TIn, T) = Me.__invoke
119                 Return Function() __invoke(obj)
120             End Function
121         End Structure
122
123         ''' <summary>
124         ''' Using parallel linq that may stuck the program when a linq task partion wait a long time task to complete. 
125         ''' By using this parallel function that you can avoid this problem from parallel linq, and also you can 
126         ''' controls the task thread number manually by using this parallel task function.
127         ''' (由于LINQ是分片段来执行的,当某个片段有一个线程被卡住之后整个进程都会被卡住,所以执行大型的计算任务的时候效率不太好,
128         ''' 使用这个并行化函数可以避免这个问题,同时也可以自己手动控制线程的并发数)
129         ''' </summary>
130         ''' <typeparam name="T"></typeparam>
131         ''' <param name="actions">Tasks collection</param>
132         ''' <param name="numThreads">
133         ''' You can controls the parallel tasks number from this parameter, smaller or equals to ZERO means auto 
134         ''' config the thread number, If want single thread, not parallel, set this value to 1, and positive 
135         ''' value greater than 1 will makes the tasks parallel.
136         ''' (可以在这里手动的控制任务的并发数,这个数值小于或者等于零则表示自动配置线程的数量, 1为单线程)
137         ''' </param>
138         ''' <param name="TimeInterval">The task run loop sleep time, unit is **ms**</param>
139         ''' <param name="smart">
140         ''' ZERO or negative value will turn off this smart mode, default value is ZERO, mode was turn off.
141         ''' If this parameter value is set to any positive value, that means this smart mode will be turn on.
142         ''' then, if the CPU load is higher than the value of this parameter indicated, then no additional 
143         ''' task thread would be added, if CPU load lower than this parameter value, then some additional 
144         ''' task thread will be added for utilize the CPU resources and save the computing time. 
145         ''' (假若开启smart模式的话,在CPU负载较高的时候会保持在限定的线程数量来执行批量任务,
146         ''' 假若CPU的负载较低的话,则会开启超量的线程,以保持执行效率充分利用计算资源来节省总任务的执行时间
147         ''' 任意正实数都将会开启smart模式
148         ''' 小于等于零的数将不会开启,默认值为零,不开启)
149         ''' </param>
150         <Extension>
151         Public Function BatchTask(Of T)(actions As Func(Of T)(), Optional numThreads% = -1%, Optional TimeInterval% = 1000%, Optional smart# = 0#) As T()
152             Dim taskPool As New List(Of AsyncHandle(Of T))
153             Dim p As New Pointer
154             Dim resultList As New List(Of T)
155             Dim CPU#
156
157             If numThreads <= 0 Then
158                 numThreads = LQuerySchedule.CPU_NUMBER * 2
159             End If
160
161             Do While p <= (actions.Length - 1)
162                 If taskPool.Count < numThreads Then  ' 向任务池里面添加新的并行任务
163                     taskPool += New AsyncHandle(Of T)(actions(++p)).Run ' 任务数量小于指定值的情况下,会直接添加计算任务直到满足数量条件
164                 Else   ' 这里是smart模式
165                     If smart > 0# Then  ' CPU的负载在指定值之内,则smart模式开启的情况下会添加新的额外的计算任务
166                         CPU = Win32.TaskManager.ProcessUsage
167
168                         If CPU <= smart Then
169                             taskPool += New AsyncHandle(Of T)(actions(++p)).Run
170                             Call $"CPU:{CPU}% <= {smart}, join an additional task thread...".__DEBUG_ECHO
171                         End If
172                     End If
173                 End If
174
175                 Dim LQuery As AsyncHandle(Of T)() =
176                     LinqAPI.Exec(Of AsyncHandle(Of T)) <= From task As AsyncHandle(Of T)
177                                                           In taskPool
178                                                           Where task.IsCompleted ' 在这里获得完成的任务
179                                                           Select task
180
181                 For Each completeTask As AsyncHandle(Of T) In LQuery
182                     Call taskPool.Remove(completeTask)
183                     Call resultList.Add(completeTask.GetValue)  '  将完成的任务从任务池之中移除然后获取返回值
184                 Next
185
186                 If TimeInterval > 0 Then
187                     Call Thread.Sleep(TimeInterval)
188                 End If
189             Loop
190
191             Dim WaitForExit As T() =
192                 LinqAPI.Exec(Of T) <= From task As AsyncHandle(Of T)
193                                       In taskPool.AsParallel  ' 等待剩余的计算任务完成计算过程
194                                       Let cli As T = task.GetValue
195                                       Select cli
196             resultList += WaitForExit
197
198             Return resultList.ToArray
199         End Function
200     End Module
201 End Namespace