1 |
#Region "Microsoft.VisualBasic::9dffcf04baed0d3461935517652eab7e, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\LQuerySchedule\LQuerySchedule.vb"
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
#End Region
|
44 |
|
45 |
Imports System.Runtime.CompilerServices
|
46 |
Imports Microsoft.VisualBasic.Linq.Extensions
|
47 |
Imports Microsoft.VisualBasic.Parallel.Tasks
|
48 |
|
49 |
Namespace Parallel.Linq
|
50 |
|
51 |
|
52 |
|
53 |
|
54 |
|
55 |
|
56 |
|
57 |
|
58 |
|
59 |
在设计并行任务的时候应该遵循的一些原则:
|
60 |
|
61 |
1. 假若每一个任务之间都是相互独立的话,则才可以进行并行化调用
|
62 |
2. 在当前程序域之中只能够通过线程的方式进行并行化,对于时间较短的任务而言,非并行化会比并行化更加有效率
|
63 |
3. 但是对于这些短时间的任务,仍然可以将序列进行分区合并为一个大型的长时间任务来产生并行化
|
64 |
4. 对于长时间的任务,可以直接使用并行化Linq拓展执行并行化
|
65 |
|
66 |
这个模块主要是针对大量的短时间的任务序列的并行化的,用户可以在这里配置线程的数量自由的控制并行化的程度
|
67 |
</remarks>
|
68 |
Public Module LQuerySchedule
|
69 |
|
70 |
|
71 |
Get the number of processors on the current machine.(获取当前的系统主机的CPU核心数)
|
72 |
|
73 |
<value></value>
|
74 |
<returns></returns>
|
75 |
</remarks>
|
76 |
Public ReadOnly Property CPU_NUMBER As Integer
|
77 |
<MethodImpl(MethodImplOptions.AggressiveInlining)>
|
78 |
Get
|
79 |
Return Environment.ProcessorCount
|
80 |
End Get
|
81 |
End Property
|
82 |
|
83 |
|
84 |
假如小于0,则认为是自动配置,0被认为是单线程,反之直接返回
|
85 |
|
86 |
<param name="n"></param>
|
87 |
<returns></returns>
|
88 |
Public Function AutoConfig(n As Integer) As Integer
|
89 |
If n < 0 Then
|
90 |
Return CPU_NUMBER
|
91 |
ElseIf n = 0 OrElse n = 1 Then
|
92 |
Return 1
|
93 |
Else
|
94 |
Return n
|
95 |
End If
|
96 |
End Function
|
97 |
|
98 |
|
99 |
The possible recommended threads of the linq based on you machine processors number, i
|
100 |
|
101 |
<value></value>
|
102 |
<returns></returns>
|
103 |
</remarks>
|
104 |
Public ReadOnly Property Recommended_NUM_THREADS As Integer
|
105 |
Get
|
106 |
Return Environment.ProcessorCount * 10
|
107 |
End Get
|
108 |
End Property
|
109 |
|
110 |
|
111 |
将大量的短时间的任务进行分区,合并,然后再执行并行化,请注意,<paramref name="task"/>参数不能够使lambda表达式,否则会出现EntryNotFound的错误
|
112 |
|
113 |
<typeparam name="T"></typeparam>
|
114 |
<typeparam name="TOut"></typeparam>
|
115 |
<param name="inputs"></param>
|
116 |
<param name="task"></param>
|
117 |
<param name="parTokens">函数参数是每一个分区里面的元素的数量</param>
|
118 |
<returns></returns>
|
119 |
Public Iterator Function LQuery(Of T, TOut)(inputs As IEnumerable(Of T),
|
120 |
task As Func(Of T, TOut),
|
121 |
Optional parTokens As Integer = 20000) As IEnumerable(Of TOut)
|
122 |
|
123 |
Call $"Start schedule task pool for {GetType(T).FullName} --> {GetType(TOut).FullName}".__DEBUG_ECHO
|
124 |
|
125 |
Dim buf = TaskPartitions.Partitioning(inputs, parTokens, task)
|
126 |
Dim LQueryInvoke = From part As Func(Of TOut())
|
127 |
In buf.AsParallel
|
128 |
Select New AsyncHandle(Of TOut())(part).Run
|
129 |
|
130 |
For Each part As AsyncHandle(Of TOut()) In LQueryInvoke
|
131 |
If part Is Nothing Then
|
132 |
Call VBDebugger.Warning("Parts of the data operation timeout!")
|
133 |
Continue For
|
134 |
End If
|
135 |
|
136 |
For Each x As TOut In part.GetValue
|
137 |
Yield x
|
138 |
Next
|
139 |
Next
|
140 |
|
141 |
Call $"Task job done!".__DEBUG_ECHO
|
142 |
End Function
|
143 |
|
144 |
|
145 |
将大量的短时间的任务进行分区,合并,然后再执行并行化
|
146 |
|
147 |
<typeparam name="T"></typeparam>
|
148 |
<typeparam name="TOut"></typeparam>
|
149 |
<param name="inputs"></param>
|
150 |
<param name="task"></param>
|
151 |
<param name="where">Processing where test on the inputs</param>
|
152 |
<returns></returns>
|
153 |
Public Iterator Function LQuery(Of T, TOut)(inputs As IEnumerable(Of T),
|
154 |
task As Func(Of T, TOut),
|
155 |
Optional where As Func(Of T, Boolean) = Nothing,
|
156 |
Optional parTokens As Integer = 20000) As IEnumerable(Of TOut)
|
157 |
|
158 |
Call $"Start schedule task pool for {GetType(T).FullName} --> {GetType(TOut).FullName}".__DEBUG_ECHO
|
159 |
|
160 |
Dim buf As IEnumerable(Of Func(Of TOut())) =
|
161 |
If(where Is Nothing,
|
162 |
TaskPartitions.Partitioning(inputs, parTokens, task),
|
163 |
TaskPartitions.Partitioning(inputs, parTokens, task, where))
|
164 |
Dim LQueryInvoke = From part As Func(Of TOut())
|
165 |
In buf.AsParallel
|
166 |
Select part()
|
167 |
|
168 |
For Each part As TOut() In LQueryInvoke
|
169 |
For Each x As TOut In part
|
170 |
Yield x
|
171 |
Next
|
172 |
Next
|
173 |
|
174 |
Call $"Task job done!".__DEBUG_ECHO
|
175 |
End Function
|
176 |
|
177 |
|
178 |
将大量的短时间的任务进行分区,合并,然后再执行并行化
|
179 |
|
180 |
<typeparam name="T"></typeparam>
|
181 |
<typeparam name="TOut"></typeparam>
|
182 |
<param name="inputs"></param>
|
183 |
<param name="task"></param>
|
184 |
<param name="outWhere">Processing where test on the output</param>
|
185 |
<returns></returns>
|
186 |
Public Iterator Function LQuery(Of T, TOut)(inputs As IEnumerable(Of T),
|
187 |
task As Func(Of T, TOut),
|
188 |
outWhere As Func(Of TOut, Boolean),
|
189 |
Optional parTokens As Integer = 20000) As IEnumerable(Of TOut)
|
190 |
|
191 |
Call $"Start schedule task pool for {GetType(T).FullName} --> {GetType(TOut).FullName}".__DEBUG_ECHO
|
192 |
|
193 |
Dim buf As IEnumerable(Of Func(Of TOut())) = TaskPartitions.Partitioning(inputs, parTokens, task)
|
194 |
Dim LQueryInvoke = From part As Func(Of TOut())
|
195 |
In buf.AsParallel
|
196 |
Select part()
|
197 |
|
198 |
For Each part As TOut() In LQueryInvoke
|
199 |
For Each x As TOut In From o As TOut
|
200 |
In part
|
201 |
Where True = outWhere(o)
|
202 |
Select o
|
203 |
Yield x
|
204 |
Next
|
205 |
Next
|
206 |
|
207 |
Call $"Task job done!".__DEBUG_ECHO
|
208 |
End Function
|
209 |
|
210 |
Public Iterator Function [Where](Of T)(source As IEnumerable(Of T),
|
211 |
test As Func(Of T, Boolean),
|
212 |
Optional parTokens As Integer = 20000) As IEnumerable(Of T())
|
213 |
Call $"Start schedule task pool for {GetType(T).FullName}".__DEBUG_ECHO
|
214 |
|
215 |
Dim buf As IEnumerable(Of Func(Of T())) = TaskPartitions.Partitions(source, parTokens, test)
|
216 |
Dim LQueryInvoke = From part As Func(Of T())
|
217 |
In buf.AsParallel
|
218 |
Select part()
|
219 |
|
220 |
For Each part As T() In LQueryInvoke
|
221 |
Yield part
|
222 |
Next
|
223 |
|
224 |
Call $"Task job done!".__DEBUG_ECHO
|
225 |
End Function
|
226 |
End Module
|
227 |
End Namespace
|