1 #Region "Microsoft.VisualBasic::d8f9d96b5fc8a1142785fa150877cdb8, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\ParallelLoading.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Module ParallelLoading
35     
36     '         Function: __loadEntry, __loadTask, __parallelLoading, __subMain, DynamicsVBCTask
37     '                   (+2 Overloads) Load, SendMessageAPI
38     '         Class LoadEntry
39     
40     '             Properties: LoadType, MethodEntryPoint
41     
42     '             FunctionToString
43     
44     '         Class LoadTaskInvoker
45     
46     '             Constructor: (+1 OverloadsSub New
47     
48     '             Function: DataProcessor, Load
49     
50     '             Sub: FillData, GetSendData, StartProcess, WaitForTaskComplete
51     
52     '         Delegate Function
53     
54     
55     
56     
57     
58     ' /********************************************************************************/
59
60 #End Region
61
62 Imports Microsoft.VisualBasic.ApplicationServices
63 Imports Microsoft.VisualBasic.Emit.CodeDOM_VBC
64 Imports Microsoft.VisualBasic.Language
65 Imports Microsoft.VisualBasic.Linq
66 Imports Microsoft.VisualBasic.Net.Http
67 Imports Microsoft.VisualBasic.Net.Protocols
68 Imports Microsoft.VisualBasic.Parallel.Tasks
69 Imports Microsoft.VisualBasic.Serialization.BinaryDumping
70
71 Namespace Parallel
72
73     ''' <summary>
74     '''
75     ''' </summary>
76     Public Module ParallelLoading
77
78         <AttributeUsage(AttributeTargets.Method, AllowMultiple:=False, Inherited:=True)>
79         Public Class LoadEntry : Inherits Attribute
80
81             ''' <summary>
82             ''' 必须满足接口类型: Function(path As StringAs T
83             ''' </summary>
84             ''' <returns></returns>
85             Public Property MethodEntryPoint As System.Reflection.MethodInfo
86
87             Public ReadOnly Property LoadType As Type
88                 Get
89                     Return MethodEntryPoint.DeclaringType
90                 End Get
91             End Property
92
93             Public Overrides Function ToString() As String
94                 Return MethodEntryPoint.ToString
95             End Function
96         End Class
97
98         ''' <summary>
99         ''' 当目标数据集非常的大的时候,在单个应用程序里面进行加载已经回非常缓慢了,
100         ''' 则这个时候可以使用这个函数将数据的加载任务分配到多个子进程之中以提高加载的时候的CPU的利用效率
101         ''' </summary>
102         ''' <typeparam name="T"></typeparam>
103         ''' <param name="sourceURL"></param>
104         ''' <returns></returns>
105         ''' <remarks>函数会自动从<typeparamref name="T"/></remarks>泛型类型之中解析出加载的函数
106         Public Function Load(Of T)(sourceURL As Generic.IEnumerable(Of String), Optional TrimNull As Boolean = FalseAs KeyValuePair(Of String, T())()
107             Dim TypeEntry As Type = GetType(T)
108             Dim EntryPoint = Parallel.ParallelLoading.__loadEntry(TypeEntry)
109
110             If EntryPoint Is Nothing Then
111                 Throw New Exception($"Could not found any entry point for type:={TypeEntry.ToString}!")
112             End If
113
114             Dim Process As String = DynamicsVBCTask(EntryPoint)  '开始进行任务进程的动态编译
115             Dim LQuery = (From source As String                  '进行并行化任务调度
116                           In sourceURL'这里不再使用并行化,因为启动Socket任务的需要为了避免端口占用的情况出现,任务不可以同时启动
117                           Select source, Task = Load(Of T)(url:=source, Process:=Process)).ToArray
118             Dim WaitTasks = (From mmfTask In LQuery.AsParallel   '等待任务的结束然后返回数据集
119                              Let value As T() = mmfTask.Task.GetValue
120                              Select New KeyValuePair(Of String, T())(mmfTask.source, value)).ToArray
121
122             If TrimNull Then
123                 WaitTasks = (From obj In WaitTasks.AsParallel Where Not obj.Value.IsNullOrEmpty Select obj).ToArray
124             End If
125             Return WaitTasks
126         End Function
127
128         ''' <summary>
129         ''' 通过与并行进程进行内存共享来传输加载完毕的数据
130         ''' </summary>
131         ''' <typeparam name="T"></typeparam>
132         ''' <param name="url"></param>
133         ''' <param name="Process"></param>
134         ''' <returns></returns>
135         Private Function Load(Of T)(url As String, Process As StringAs Task(Of String, T())
136             Dim Task As New Task(Of String, T())(url, ParallelLoading.__loadTask(Of T)(Process))
137             Return Task
138         End Function
139
140         Private Function __loadTask(Of T)(process As StringAs Func(Of String, T())
141             Call Threading.Thread.Sleep(1000)
142             Return AddressOf New LoadTaskInvoker(Of T)(process).Load
143         End Function
144
145         Private Class LoadTaskInvoker(Of T)
146
147             ReadOnly Process As String
148
149             Sub New(Process As String)
150                 Me.Process = Process
151             End Sub
152
153             Public Function Load(url As StringAs T()
154                 Dim Socket As New Microsoft.VisualBasic.Net.TcpSynchronizationServicesSocket(AddressOf DataProcessor, Net.GetFirstAvailablePort)
155                 Call New Threading.Thread(AddressOf Socket.Run).Start()
156                 Call StartProcess($"{url.CLIPath} { Socket.LocalPort}")
157                 Call WaitForTaskComplete()
158                 Return resultBuffer
159             End Function
160
161             Private Sub StartProcess(argvs As String)
162                 Dim ProcStart = New ProcessStartInfo(Process, arguments:=argvs)
163                 Dim ProcInvoke As New Process With {.StartInfo = ProcStart}
164
165                 ProcStart.CreateNoWindow = True
166                 ProcInvoke.Start()
167             End Sub
168
169             Private Function DataProcessor(uid As Long, request As RequestStream, remote As System.Net.IPEndPoint) As RequestStream
170                 Dim requestData As String = request.GetUTF8String
171
172                 If String.IsNullOrEmpty(requestData) Then
173                     Return NetResponse.RFC_NO_CONTENT
174                 End If
175
176                 If requestData.StartsWith(MMFProtocol.MMFSocket.MMF_PROTOCOL) Then
177                     '进程开始向父进程返回数据了
178                     Dim host As String = Mid(requestData, Len(MMFProtocol.MMFSocket.MMF_PROTOCOL) + 1)
179                     Call GetSendData(host)
180                 End If
181
182                 Return NetResponse.RFC_OK
183             End Function
184
185             Dim TaskComplete As Boolean = False
186
187             Private Sub WaitForTaskComplete()
188                 Do While Not TaskComplete
189                     Call Threading.Thread.Sleep(100)
190                 Loop
191             End Sub
192
193             Dim resultBuffer As T()
194             Dim _client As MMFProtocol.MMFSocket
195
196             Private Sub GetSendData(host As String)
197                 _client = New MMFProtocol.MMFSocket(host, AddressOf FillData)
198             End Sub
199
200             Private Sub FillData(byteBuffer As Byte())
201                 '  resultBuffer = byteBuffer.DeSerialize(Of T())
202                 TaskComplete = True
203             End Sub
204         End Class
205
206         ''' <summary>
207         ''' 动态编译的加载进程的调用API来向主进程返回消息
208         ''' </summary>
209         ''' <param name="Port"></param>
210         ''' <returns></returns>
211         Public Function SendMessageAPI(Port As IntegerAs String
212             Dim host As String = "Parallel-" & Process.GetCurrentProcess.Id
213             Dim Client As New Microsoft.VisualBasic.Net.AsynInvoke("127.0.0.1", Port)
214             Call Client.SendMessage($"{MMFProtocol.MMFSocket.MMF_PROTOCOL}{host}")
215             Return host
216         End Function
217
218
219
220         ''' <summary>
221         ''' 动态编译
222         ''' </summary>
223         ''' <param name="LoadEntry"></param>
224         ''' <returns></returns>
225         Public Function DynamicsVBCTask(LoadEntry As LoadEntry) As String
226             Dim refList As String() = GetReferences(LoadEntry.LoadType)
227             Dim ns As New CodeDom.CodeNamespace(NameOf(Parallel.ParallelLoading))
228
229             Call ns.Types.Add(__subMain(LoadEntry))
230             Call ns.GenerateCode.__DEBUG_ECHO
231
232             Dim assembly As System.Reflection.Assembly = ns.Compile(refList, RunTimeDirectory, CodeDOMExtension.ExecutableProfile)
233             Dim Dir As String = FileIO.FileSystem.GetParentPath(assembly.Location)
234             For Each File As String In refList
235                 Dim buffer = IO.File.ReadAllBytes(File)
236                 Dim Saved As String = $"{Dir}/{FileIO.FileSystem.GetFileInfo(File).Name}"
237                 Try
238                     Call IO.File.WriteAllBytes(Saved, buffer)
239                 Catch ex As Exception
240                     Call ex.PrintException
241                 End Try
242             Next
243             Return assembly.Location
244         End Function
245
246         Private Function __subMain(loadEntry As LoadEntry) As CodeDom.CodeTypeDeclaration
247             Dim ProgramEntry As New CodeDom.CodeTypeDeclaration("Program")
248             Dim SubMain As New CodeDom.CodeMemberMethod()
249
250             Call ProgramEntry.Members.Add(SubMain)
251             SubMain.Name = "Main"
252             SubMain.ReturnType = New CodeDom.CodeTypeReference(GetType(System.Void))
253             SubMain.Parameters.Add(New CodeDom.CodeParameterDeclarationExpression(GetType(String()), SubMainArgv))
254             SubMain.Attributes = CodeDom.MemberAttributes.Public Or CodeDom.MemberAttributes.Static
255             SubMain = __parallelLoading(invoke:=SubMain, loadEntry:=loadEntry)
256
257             Return ProgramEntry
258         End Function
259
260         Const SubMainArgv As String = "Argv"
261         Const LoadFile As String = "File"
262         Const LoadResult As String = "LoadResult"
263         Const Port As String = "Port"
264         Const Host As String = "host"
265         Const Socket As String = "Socket"
266         Const Buffer As String = "buffer"
267
268         Private Function __parallelLoading(invoke As CodeDom.CodeMemberMethod, loadEntry As LoadEntry) As CodeDom.CodeMemberMethod
269
270             Dim File As String = argv(Scan0)
271             Call invoke.Statements.Add(LocalsInit(LoadFile, GetType(String), CodeDOMExpressions.GetValue(New CodeDom.CodeArgumentReferenceExpression(SubMainArgv), Scan0)))
272
273             Dim PortValue As CodeDom.CodeExpression = CodeDOMExpressions.GetValue(New CodeDom.CodeArgumentReferenceExpression(SubMainArgv), 1)
274             PortValue = [Call](GetType(Conversion), NameOf(Conversion.Val), {PortValue})
275             PortValue = [CType](PortValue, GetType(Integer))
276
277             Dim Port As Integer = CInt(Val(argv(1)))
278             Call invoke.Statements.Add(LocalsInit(Port, GetType(Integer), PortValue))
279             Call invoke.Statements.Add([Call](GetType(Extensions), NameOf(__DEBUG_ECHO),
280                                               {
281                                                 [Call](GetType(String), NameOf(String.Format), {CodeDOMExpressions.Value("Load stream from url:={0}, port:={1}..."), LocalVariable(LoadFile), LocalVariable(Port)})
282                                               }))
283             Call invoke.Statements.Add([Call](GetType(Extensions), NameOf(__DEBUG_ECHO), {"Start to loading data..."}))
284             Dim LoadResult = ParallelLoadingTest.Load(File)  '数据加载
285             Call invoke.Statements.Add(LocalsInit(LoadResult, loadEntry.LoadType.MakeArrayType, initExpression:=[Call](loadEntry.MethodEntryPoint, {LocalVariable(LoadFile)})))
286             Call invoke.Statements.Add([Call](GetType(Extensions), NameOf(__DEBUG_ECHO), {"Data loading Job Done!"}))
287
288             '得到结果之后进行序列化通过内存映射共享返回给主程序
289             Dim host As String = Microsoft.VisualBasic.Parallel.ParallelLoading.SendMessageAPI(Port)  '返回消息
290             Call invoke.Statements.Add(LocalsInit(Host, GetType(String), [Call](GetType(ParallelLoading), NameOf(SendMessageAPI), {LocalVariable(Port)})))
291             Dim Socket As New Microsoft.VisualBasic.MMFProtocol.MMFSocket(hostName:=host) '打开映射的端口
292             Call invoke.Statements.Add(LocalsInit(Socket, GetType(MMFProtocol.MMFSocket), [New](GetType(MMFProtocol.MMFSocket), {LocalVariable(Host)})))
293             Call invoke.Statements.Add([Call](GetType(Extensions), NameOf(__DEBUG_ECHO), {"Init transfer device job done, start to transferred data!"}))
294             Call Socket.SendMessage(LoadResult.GetSerializeBuffer) '返回内存数据
295             Call invoke.Statements.Add(LocalsInit(Buffer, GetType(Byte()), [Call](GetType(StructSerializer), NameOf(StructureToByte), {LocalVariable(LoadResult)})))
296             Call invoke.Statements.Add([Call](LocalVariable(Socket), NameOf(MMFProtocol.MMFSocket.SendMessage), {LocalVariable(Buffer)}))
297             Call invoke.Statements.Add([Call](GetType(Extensions), NameOf(__DEBUG_ECHO), {"Data transportation Job Done!"}))
298
299             Return invoke
300         End Function
301
302         Private Function __loadEntry(Type As Type) As LoadEntry
303             Dim Entries = Type.GetMethods(System.Reflection.BindingFlags.Public Or System.Reflection.BindingFlags.Static)
304             If Entries.IsNullOrEmpty Then
305                 Return Nothing
306             End If
307
308             Dim setValue = New SetValue(Of ParallelLoading.LoadEntry)() _
309                 .GetSet(NameOf(ParallelLoading.LoadEntry.MethodEntryPoint))
310             Dim LQuery As LoadEntry =
311                 LinqAPI.DefaultFirst(Of LoadEntry) <= From EntryPoint As System.Reflection.MethodInfo
312                                                       In Entries.AsParallel
313                                                       Let attrs As Object() = EntryPoint.GetCustomAttributes(attributeType:=GetType(LoadEntry), inherit:=True)
314                                                       Where Not attrs.IsNullOrEmpty
315                                                       Let LoadEntry = DirectCast(attrs.First, LoadEntry)
316                                                       Select setValue(LoadEntry, EntryPoint)
317             Return LQuery
318         End Function
319
320         Delegate Function ParallelLoad(Of T)(sourceUrl As StringAs T()
321
322     End Module
323 End Namespace