1 #Region "Microsoft.VisualBasic::9dffcf04baed0d3461935517652eab7e, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\LQuerySchedule\LQuerySchedule.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Module LQuerySchedule
35     
36     '         Properties: CPU_NUMBER, Recommended_NUM_THREADS
37     
38     '         Function: [Where], AutoConfig, (+3 Overloads) LQuery
39     
40     
41     ' /********************************************************************************/
42
43 #End Region
44
45 Imports System.Runtime.CompilerServices
46 Imports Microsoft.VisualBasic.Linq.Extensions
47 Imports Microsoft.VisualBasic.Parallel.Tasks
48
49 Namespace Parallel.Linq
50
51     ''' <summary>
52     ''' Parallel Linq query library for VisualBasic.
53     ''' (用于高效率执行批量查询操作和用于检测操作超时的工具对象,请注意,为了提高查询的工作效率,请尽量避免在查询操作之中生成新的临时对象
54     ''' 并行版本的LINQ查询和原始的线程操作相比具有一些性能上面的局限性)
55     ''' </summary>
56     ''' <remarks>
57     ''' 在使用``Parallel LINQ``的时候,请务必要注意不能够使用Let语句操作共享变量,因为排除死锁的开销比较大
58     ''' 
59     ''' 在设计并行任务的时候应该遵循的一些原则:
60     ''' 
61     ''' 1. 假若每一个任务之间都是相互独立的话,则才可以进行并行化调用
62     ''' 2. 在当前程序域之中只能够通过线程的方式进行并行化,对于时间较短的任务而言,非并行化会比并行化更加有效率
63     ''' 3. 但是对于这些短时间的任务,仍然可以将序列进行分区合并为一个大型的长时间任务来产生并行化
64     ''' 4. 对于长时间的任务,可以直接使用并行化Linq拓展执行并行化
65     ''' 
66     ''' 这个模块主要是针对大量的短时间的任务序列的并行化的,用户可以在这里配置线程的数量自由的控制并行化的程度
67     ''' </remarks>
68     Public Module LQuerySchedule
69
70         ''' <summary>
71         ''' Get the number of processors on the current machine.(获取当前的系统主机的CPU核心数)
72         ''' </summary>
73         ''' <value></value>
74         ''' <returns></returns>
75         ''' <remarks></remarks>
76         Public ReadOnly Property CPU_NUMBER As Integer
77             <MethodImpl(MethodImplOptions.AggressiveInlining)>
78             Get
79                 Return Environment.ProcessorCount
80             End Get
81         End Property
82
83         ''' <summary>
84         ''' 假如小于0,则认为是自动配置,0被认为是单线程,反之直接返回
85         ''' </summary>
86         ''' <param name="n"></param>
87         ''' <returns></returns>
88         Public Function AutoConfig(n As IntegerAs Integer
89             If n < 0 Then
90                 Return CPU_NUMBER
91             ElseIf n = 0 OrElse n = 1 Then
92                 Return 1
93             Else
94                 Return n
95             End If
96         End Function
97
98         ''' <summary>
99         ''' The possible recommended threads of the linq based on you machine processors number, i'm not sure...
100         ''' </summary>
101         ''' <value></value>
102         ''' <returns></returns>
103         ''' <remarks></remarks>
104         Public ReadOnly Property Recommended_NUM_THREADS As Integer
105             Get
106                 Return Environment.ProcessorCount * 10
107             End Get
108         End Property
109
110         ''' <summary>
111         ''' 将大量的短时间的任务进行分区,合并,然后再执行并行化,请注意,<paramref name="task"/>参数不能够使lambda表达式,否则会出现EntryNotFound的错误
112         ''' </summary>
113         ''' <typeparam name="T"></typeparam>
114         ''' <typeparam name="TOut"></typeparam>
115         ''' <param name="inputs"></param>
116         ''' <param name="task"></param>
117         ''' <param name="parTokens">函数参数是每一个分区里面的元素的数量</param>
118         ''' <returns></returns>
119         Public Iterator Function LQuery(Of T, TOut)(inputs As IEnumerable(Of T),
120                                                     task As Func(Of T, TOut),
121                                                     Optional parTokens As Integer = 20000) As IEnumerable(Of TOut)
122
123             Call $"Start schedule task pool for {GetType(T).FullName}  -->  {GetType(TOut).FullName}".__DEBUG_ECHO
124
125             Dim buf = TaskPartitions.Partitioning(inputs, parTokens, task)
126             Dim LQueryInvoke = From part As Func(Of TOut())
127                                In buf.AsParallel
128                                Select New AsyncHandle(Of TOut())(part).Run
129
130             For Each part As AsyncHandle(Of TOut()) In LQueryInvoke
131                 If part Is Nothing Then
132                     Call VBDebugger.Warning("Parts of the data operation timeout!")
133                     Continue For
134                 End If
135
136                 For Each x As TOut In part.GetValue
137                     Yield x
138                 Next
139             Next
140
141             Call $"Task job done!".__DEBUG_ECHO
142         End Function
143
144         ''' <summary>
145         ''' 将大量的短时间的任务进行分区,合并,然后再执行并行化
146         ''' </summary>
147         ''' <typeparam name="T"></typeparam>
148         ''' <typeparam name="TOut"></typeparam>
149         ''' <param name="inputs"></param>
150         ''' <param name="task"></param>
151         ''' <param name="where">Processing where test on the inputs</param>
152         ''' <returns></returns>
153         Public Iterator Function LQuery(Of T, TOut)(inputs As IEnumerable(Of T),
154                                                     task As Func(Of T, TOut),
155                                                     Optional where As Func(Of T, Boolean) = Nothing,
156                                                     Optional parTokens As Integer = 20000) As IEnumerable(Of TOut)
157
158             Call $"Start schedule task pool for {GetType(T).FullName}  -->  {GetType(TOut).FullName}".__DEBUG_ECHO
159
160             Dim buf As IEnumerable(Of Func(Of TOut())) =
161                 If(where Is Nothing,
162                 TaskPartitions.Partitioning(inputs, parTokens, task),
163                 TaskPartitions.Partitioning(inputs, parTokens, task, where))
164             Dim LQueryInvoke = From part As Func(Of TOut())
165                                In buf.AsParallel
166                                Select part()
167
168             For Each part As TOut() In LQueryInvoke
169                 For Each x As TOut In part
170                     Yield x
171                 Next
172             Next
173
174             Call $"Task job done!".__DEBUG_ECHO
175         End Function
176
177         ''' <summary>
178         ''' 将大量的短时间的任务进行分区,合并,然后再执行并行化
179         ''' </summary>
180         ''' <typeparam name="T"></typeparam>
181         ''' <typeparam name="TOut"></typeparam>
182         ''' <param name="inputs"></param>
183         ''' <param name="task"></param>
184         ''' <param name="outWhere">Processing where test on the output</param>
185         ''' <returns></returns>
186         Public Iterator Function LQuery(Of T, TOut)(inputs As IEnumerable(Of T),
187                                                     task As Func(Of T, TOut),
188                                                     outWhere As Func(Of TOut, Boolean),
189                                                     Optional parTokens As Integer = 20000) As IEnumerable(Of TOut)
190
191             Call $"Start schedule task pool for {GetType(T).FullName}  -->  {GetType(TOut).FullName}".__DEBUG_ECHO
192
193             Dim buf As IEnumerable(Of Func(Of TOut())) = TaskPartitions.Partitioning(inputs, parTokens, task)
194             Dim LQueryInvoke = From part As Func(Of TOut())
195                                In buf.AsParallel
196                                Select part()
197
198             For Each part As TOut() In LQueryInvoke
199                 For Each x As TOut In From o As TOut
200                                       In part
201                                       Where True = outWhere(o)
202                                       Select o
203                     Yield x
204                 Next
205             Next
206
207             Call $"Task job done!".__DEBUG_ECHO
208         End Function
209
210         Public Iterator Function [Where](Of T)(source As IEnumerable(Of T),
211                                                test As Func(Of T, Boolean),
212                                                Optional parTokens As Integer = 20000) As IEnumerable(Of T())
213             Call $"Start schedule task pool for {GetType(T).FullName}".__DEBUG_ECHO
214
215             Dim buf As IEnumerable(Of Func(Of T())) = TaskPartitions.Partitions(source, parTokens, test)
216             Dim LQueryInvoke = From part As Func(Of T())
217                                In buf.AsParallel
218                                Select part()
219
220             For Each part As T() In LQueryInvoke
221                 Yield part
222             Next
223
224             Call $"Task job done!".__DEBUG_ECHO
225         End Function
226     End Module
227 End Namespace