1 #Region "Microsoft.VisualBasic::2bb2b8738c63800c0bc39860550774fc, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\BatchTasks.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Module BatchTasks
35     
36     '         Function: (+2 Overloads) BatchTask
37     
38     '         Sub: BatchTask
39     '         Structure __threadHelper
40     
41     '             Function: __task
42     
43     
44     
45     
46     ' /********************************************************************************/
47
48 #End Region
49
50 Imports System.Runtime.CompilerServices
51 Imports System.Threading
52 Imports Microsoft.VisualBasic.CommandLine
53 Imports Microsoft.VisualBasic.Language
54 Imports Microsoft.VisualBasic.Linq
55 Imports Microsoft.VisualBasic.Parallel.Linq
56 Imports Microsoft.VisualBasic.Parallel.Tasks
57
58 Namespace Parallel.Threads
59
60     ''' <summary>
61     ''' Parallel batch task tool for processor
62     ''' </summary>
63     Public Module BatchTasks
64
65         ''' <summary>
66         ''' 当所需要进行计算的数据量比较大的时候,建议分块使用本函数生成多个进程进行批量计算以获得较好的计算效率
67         ''' </summary>
68         ''' <typeparam name="T"></typeparam>
69         ''' <param name="source"></param>
70         ''' <param name="getCLI"></param>
71         ''' <param name="getExe"></param>
72         ''' <param name="numThreads">-1表示使用系统自动配置的参数,一次性提交所有的计算任务可能会是计算效率变得很低,所以需要使用这个参数来控制计算的线程数量</param>
73         ''' <param name="TimeInterval">默认的任务提交时间间隔是一秒钟提交一个新的计算任务</param>
74         Public Sub BatchTask(Of T)(source As IEnumerable(Of T),
75                                    getCLI As Func(Of T, String),
76                                    getExe As Func(Of String),
77                                    Optional numThreads As Integer = -1,
78                                    Optional TimeInterval As Integer = 1000)
79
80             Dim srcArray As Func(Of Integer)() =
81                 LinqAPI.Exec(Of Func(Of Integer)) <= From x As T In source
82                                                      Let task As IORedirectFile =
83                                                          New IORedirectFile(getExe(), getCLI(x))
84                                                      Let runTask As Func(Of Integer) = AddressOf task.Run
85                                                      Select runTask
86             Call BatchTask(srcArray, numThreads, TimeInterval)
87         End Sub
88
89         ''' <summary>
90         ''' 
91         ''' </summary>
92         ''' <typeparam name="TIn"></typeparam>
93         ''' <typeparam name="T"></typeparam>
94         ''' <param name="source"></param>
95         ''' <param name="getTask"></param>
96         ''' <param name="numThreads">
97         ''' 可以在这里手动的控制任务的并发数,这个数值小于或者等于零则表示自动配置线程的数量,如果想要单线程,请将这个参数设置为1
98         ''' </param>
99         ''' <param name="TimeInterval"></param>
100         ''' <returns></returns>
101         <Extension>
102         Public Function BatchTask(Of TIn, T)(source As IEnumerable(Of TIn),
103                                              getTask As Func(Of TIn, T),
104                                              Optional numThreads As Integer = -1,
105                                              Optional TimeInterval As Integer = 1000) As T()
106             Dim taskHelper As New __threadHelper(Of TIn, T) With {
107                 .__invoke = getTask
108             }
109             Return source _
110                 .Select(AddressOf taskHelper.__task) _
111                 .ToArray _
112                 .BatchTask(numThreads, TimeInterval)
113         End Function
114
115         Private Structure __threadHelper(Of TIn, T)
116
117             Public __invoke As Func(Of TIn, T)
118
119             Public Function __task(obj As TIn) As Func(Of T)
120                 Dim __invoke As Func(Of TIn, T) = Me.__invoke
121                 Return Function() __invoke(obj)
122             End Function
123         End Structure
124
125         ''' <summary>
126         ''' Using parallel linq that may stuck the program when a linq task partion wait a long time task to complete. 
127         ''' By using this parallel function that you can avoid this problem from parallel linq, and also you can 
128         ''' controls the task thread number manually by using this parallel task function.
129         ''' (由于LINQ是分片段来执行的,当某个片段有一个线程被卡住之后整个进程都会被卡住,所以执行大型的计算任务的时候效率不太好,
130         ''' 使用这个并行化函数可以避免这个问题,同时也可以自己手动控制线程的并发数)
131         ''' </summary>
132         ''' <typeparam name="T"></typeparam>
133         ''' <param name="actions">Tasks collection</param>
134         ''' <param name="numThreads">
135         ''' You can controls the parallel tasks number from this parameter, smaller or equals to ZERO means auto 
136         ''' config the thread number, If want single thread, not parallel, set this value to 1, and positive 
137         ''' value greater than 1 will makes the tasks parallel.
138         ''' (可以在这里手动的控制任务的并发数,这个数值小于或者等于零则表示自动配置线程的数量, 1为单线程)
139         ''' </param>
140         ''' <param name="TimeInterval">The task run loop sleep time, unit is **ms**</param>
141         ''' <param name="smart">
142         ''' ZERO or negative value will turn off this smart mode, default value is ZERO, mode was turn off.
143         ''' If this parameter value is set to any positive value, that means this smart mode will be turn on.
144         ''' then, if the CPU load is higher than the value of this parameter indicated, then no additional 
145         ''' task thread would be added, if CPU load lower than this parameter value, then some additional 
146         ''' task thread will be added for utilize the CPU resources and save the computing time. 
147         ''' (假若开启smart模式的话,在CPU负载较高的时候会保持在限定的线程数量来执行批量任务,
148         ''' 假若CPU的负载较低的话,则会开启超量的线程,以保持执行效率充分利用计算资源来节省总任务的执行时间
149         ''' 任意正实数都将会开启smart模式
150         ''' 小于等于零的数将不会开启,默认值为零,不开启)
151         ''' </param>
152         <Extension>
153         Public Function BatchTask(Of T)(actions As Func(Of T)(), Optional numThreads% = -1%, Optional timeInterval% = 1000%, Optional smart# = 0#) As T()
154             Dim taskPool As New List(Of AsyncHandle(Of T))
155             Dim p As New Pointer
156             Dim resultList As New List(Of T)
157             Dim CPU#
158
159             If numThreads <= 0 Then
160                 numThreads = LQuerySchedule.CPU_NUMBER * 2
161             End If
162
163             Do While p <= (actions.Length - 1)
164                 If taskPool.Count < numThreads Then
165                     ' 向任务池里面添加新的并行任务
166                     ' 任务数量小于指定值的情况下,会直接添加计算任务直到满足数量条件
167                     taskPool += New AsyncHandle(Of T)(actions(++p)).Run
168                 Else
169                     If smart > 0# Then
170                         ' 这里是smart模式
171                         ' CPU的负载在指定值之内,则smart模式开启的情况下会添加新的额外的计算任务
172                         CPU = Win32.TaskManager.ProcessUsage
173
174                         If CPU <= smart Then
175                             taskPool += New AsyncHandle(Of T)(actions(++p)).Run
176                             Call $"CPU:{CPU}% <= {smart}, join an additional task thread...".__DEBUG_ECHO
177                         End If
178                     End If
179                 End If
180
181                 ' 在这里获得完成的任务
182                 Dim LQuery As AsyncHandle(Of T)() =
183                     LinqAPI.Exec(Of AsyncHandle(Of T)) _
184  _
185                     () <= From task As AsyncHandle(Of T)
186                           In taskPool
187                           Where task.IsCompleted
188                           Select task
189
190                 For Each completeTask As AsyncHandle(Of T) In LQuery
191                     ' 将完成的任务从任务池之中移除然后获取返回值
192                     Call taskPool.Remove(completeTask)
193                     Call resultList.Add(completeTask.GetValue)
194                 Next
195
196                 If timeInterval > 0 Then
197                     Call Thread.Sleep(timeInterval)
198                 End If
199             Loop
200
201             ' 等待剩余的计算任务完成计算过程
202             Dim waitForExit As T() =
203                 LinqAPI.Exec(Of T) <= From task As AsyncHandle(Of T)
204                                       In taskPool.AsParallel
205                                       Let cli As T = task.GetValue
206                                       Select cli
207             resultList += waitForExit
208
209             Return resultList.ToArray
210         End Function
211     End Module
212 End Namespace