1 #Region "Microsoft.VisualBasic::c77d1da748079d1ae782dc55a1cb89d5, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\LQuerySchedule\TaskPartitions.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Module TaskPartitions
35     
36     '         Function: (+2 Overloads) Partitioning, Partitions, (+2 Overloads) PartTokens, SplitIterator
37     '         Structure __taskHelper
38     
39     '             FunctionInvoke, InvokeWhere, ToString
40     
41     
42     
43     
44     ' /********************************************************************************/
45
46 #End Region
47
48 Imports System.Runtime.CompilerServices
49 Imports Microsoft.VisualBasic.Language
50
51 Namespace Parallel.Linq
52
53     ''' <summary>
54     ''' 对大量的短时间的任务进行分区的操作是在这里完成的
55     ''' </summary>
56     Public Module TaskPartitions
57
58         ''' <summary>
59         ''' 根据任务总量计算出所需要的线程的数量
60         ''' </summary>
61         ''' <param name="source"></param>
62         ''' <param name="num_threads"></param>
63         ''' <returns></returns>
64         ''' <remarks>假设所有的任务都被平均的分配到每一个线程之上</remarks>
65         Public Function PartTokens(source As Integer, num_threads As IntegerAs Integer
66             Return (source / num_threads) - 1
67         End Function
68
69         Public Function PartTokens(source As IntegerAs Integer
70             Return PartTokens(source, num_threads:=LQuerySchedule.CPU_NUMBER)
71         End Function
72
73         ''' <summary>
74         ''' Performance the partitioning operation on the input sequence.
75         ''' (请注意,这个函数适用于数量非常多的序列。对所输入的序列进行分区操作,<paramref name="parTokens"/>函数参数是每一个分区里面的元素的数量)
76         ''' </summary>
77         ''' <typeparam name="T"></typeparam>
78         ''' <param name="source"></param>
79         ''' <param name="parTokens">每一个分区之中的元素数量</param>
80         ''' <returns></returns>
81         ''' <remarks>对于数量较少的序列,可以使用<see cref="Extensions.SplitIterator(Of T)(IEnumerable(Of T), IntegerBoolean)"/>进行分区操作,
82         ''' 该函数使用数组的<see cref="Array.ConstrainedCopy(Array, Integer, Array, IntegerInteger)"/>方法进行分区复制,效率较高
83         ''' 
84         ''' 由于本函数需要处理大量的数据,使用Array的方法会内存占用较厉害,所以在这里更改为List操作以降低内存的占用
85         ''' </remarks>
86         <Extension>
87         Public Iterator Function SplitIterator(Of T)(source As IEnumerable(Of T), parTokens%, Optional echo As Boolean = TrueAs IEnumerable(Of T())
88             Dim buf As New List(Of T)
89             Dim n As Integer = 0
90             Dim parts As Integer
91
92             For Each x As T In source
93                 If n = parTokens Then
94                     Yield buf.ToArray
95                     buf.Clear()
96                     n = 0
97                     parts += 1
98                 End If
99
100                 buf.Add(x)
101                 n += 1
102             Next
103
104             If buf.Count > 0 Then
105                 Yield buf.ToArray
106             End If
107
108             If echo Then
109                 Call $"Large data set data partitioning(partitions:={parts}) jobs done!".__DEBUG_ECHO
110             End If
111         End Function
112
113         ''' <summary>
114         ''' 进行分区之后返回一个长时间的任务组合
115         ''' </summary>
116         ''' <typeparam name="T"></typeparam>
117         ''' <param name="parts">函数参数是每一个分区里面的元素的数量</param>
118         ''' <returns></returns>
119         ''' 
120         <Extension>
121         Public Iterator Function Partitioning(Of T, out)(source As IEnumerable(Of T),
122                                                          parts As Integer,
123                                                          task As Func(Of T, out)) As IEnumerable(Of Func(Of out()))
124
125             Dim buf As IEnumerable(Of T()) = source.SplitIterator(parts)
126
127             For Each part As T() In buf
128                 Yield AddressOf New __taskHelper(Of T, out) With {
129                     .source = part,
130                     .task = task
131                 }.Invoke
132             Next
133         End Function
134
135         ''' <summary>
136         ''' 进行分区之后返回一个长时间的任务组合
137         ''' </summary>
138         ''' <typeparam name="T"></typeparam>
139         ''' <returns></returns>
140         ''' 
141         <Extension>
142         Public Iterator Function Partitioning(Of T, out)(source As IEnumerable(Of T),
143                                                          parts As Integer,
144                                                          task As Func(Of T, out),
145                                                          where As Func(Of T, Boolean)) As IEnumerable(Of Func(Of out()))
146
147             Dim buf As IEnumerable(Of T()) = source.SplitIterator(parts)
148
149             For Each part As T() In buf
150                 Yield AddressOf New __taskHelper(Of T, out) With {
151                     .source = part,
152                     .task = task,
153                     .where = where
154                 }.InvokeWhere
155             Next
156         End Function
157
158         Public Iterator Function Partitions(Of T)(source As IEnumerable(Of T),
159                                                   parts As Integer,
160                                                   [where] As Func(Of T, Boolean)) As IEnumerable(Of Func(Of T()))
161
162             Dim buf As IEnumerable(Of T()) = source.SplitIterator(parts)
163
164             For Each part As T() In buf
165                 Yield Function() LinqAPI.Exec(Of T) <= From x As T
166                                                        In part
167                                                        Where where(x) = True
168                                                        Select x
169             Next
170         End Function
171
172         ''' <summary>
173         ''' 因为在上一层调用之中使用了并行化,所以在这里不能够使用并行化拓展了
174         ''' </summary>
175         ''' <typeparam name="T"></typeparam>
176         ''' <typeparam name="out"></typeparam>
177         Private Structure __taskHelper(Of T, out)
178
179             Dim task As Func(Of T, out)
180             Dim source As T()
181             Dim where As Func(Of T, Boolean)
182
183             Public Overrides Function ToString() As String
184                 Return task.ToString
185             End Function
186
187             Public Function InvokeWhere() As out()
188                 Dim __task As Func(Of T, out) = task
189                 Dim test = where
190                 Dim LQuery As out() =
191                     LinqAPI.Exec(Of out) <= From x As T
192                                             In source
193                                             Where True = test(x)
194                                             Select __task(x)
195                 Return LQuery
196             End Function
197
198             Public Function Invoke() As out()
199                 Dim __task As Func(Of T, out) = task
200                 Dim LQuery As out() =
201                     LinqAPI.Exec(Of out) <= From x As T
202                                             In source
203                                             Select __task(x)
204                 Return LQuery
205             End Function
206         End Structure
207     End Module
208 End Namespace