1 #Region "Microsoft.VisualBasic::ea6d9ec029a35673233c58ed81288087, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Tasks\TaskQueue.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Interface ITaskHandle
35     
36     '         Function: Run
37     
38     '     Class TaskQueue
39     
40     '         Properties: RunningTask, Tasks, uid
41     
42     '         Constructor: (+1 OverloadsSub New
43     
44     '         Function: (+2 Overloads) Join
45     
46     '         Sub: __taskQueueEXEC, (+2 Overloads) Dispose, (+2 Overloads) Enqueue
47     '         Structure __task
48     
49     '             Properties: Value
50     
51     '             Sub: Run
52     
53     
54     
55     
56     ' /********************************************************************************/
57
58 #End Region
59
60 Imports System.Runtime.CompilerServices
61 Imports System.Threading
62
63 Namespace Parallel.Tasks
64
65     Public Interface ITaskHandle(Of T)
66         Function Run() As T
67     End Interface
68
69     ''' <summary>
70     ''' 这个只有一条线程来执行
71     ''' </summary>
72     Public Class TaskQueue(Of T) : Implements IDisposable
73
74         ''' <summary>
75         ''' ###### 2018-1-27
76         ''' 
77         ''' 如果直接在这里使用<see cref="App.BufferSize"/>的话,极端的情况下会导致服务器的内存直接被耗尽
78         ''' 所以在这里使用一个较小的常数值
79         ''' </summary>
80         ReadOnly __tasks As New Queue(Of __task)(4096)
81
82         ''' <summary>
83         ''' 返回当前的任务池之中的任务数量
84         ''' </summary>
85         ''' <returns></returns>
86         Public ReadOnly Property Tasks As Integer
87             <MethodImpl(MethodImplOptions.AggressiveInlining)>
88             Get
89                 SyncLock __tasks
90                     Return __tasks.Count
91                 End SyncLock
92             End Get
93         End Property
94
95         ''' <summary>
96         ''' 会单独启动一条新的线程来用来执行任务队列
97         ''' </summary>
98         Sub New()
99 #If DEBUG Then
100             Call $"Using default buffer_size={App.BufferSize}".__DEBUG_ECHO
101 #End If
102             Call RunTask(AddressOf __taskQueueEXEC)
103         End Sub
104
105         ''' <summary>
106         ''' 函数会被插入一个队列之中,之后线程会被阻塞在这里直到函数执行完毕,这个主要是用来控制服务器上面的任务并发的
107         ''' 一般情况下不会使用这个方法,这个方法主要是控制服务器资源的利用程序的,当线程处于忙碌的状态的时候,
108         ''' 当前线程会被一直阻塞,直到线程空闲
109         ''' </summary>
110         ''' <param name="handle"></param>
111         ''' <returns>假若本对象已经开始Dispose了,则为完成的任务都会返回Nothing</returns>
112         Public Function Join(handle As Func(Of T)) As T
113             Dim task As New __task With {
114                 .handle = handle,
115                 .receiveDone = New ManualResetEvent(False)
116             }
117
118             SyncLock __tasks
119                 Call __tasks.Enqueue(task)
120             End SyncLock
121
122             Call task.receiveDone.WaitOne()
123             Call task.receiveDone.Reset()
124
125             Return task.Value
126         End Function
127
128         Public Function Join(handle As ITaskHandle(Of T)) As T
129             Return Join(AddressOf handle.Run)
130         End Function
131
132         Public Sub Enqueue(handle As ITaskHandle(Of T), Optional callback As Action(Of T) = Nothing)
133             Call Enqueue(AddressOf handle.Run, callback)
134         End Sub
135
136         ''' <summary>
137         ''' 这个函数只会讲任务添加到队列之中,而不会阻塞线程
138         ''' </summary>
139         ''' <param name="handle"></param>
140         Public Sub Enqueue(handle As Func(Of T), Optional callback As Action(Of T) = Nothing)
141             Dim task As New __task With {
142                 .handle = handle,
143                 .callback = callback
144             }
145
146             SyncLock __tasks
147                 Call __tasks.Enqueue(task)
148             End SyncLock
149         End Sub
150
151         ''' <summary>
152         ''' 当这个属性为False的时候说明没有任务在执行,此时为空闲状态
153         ''' </summary>
154         ''' <returns></returns>
155         Public ReadOnly Property RunningTask As Boolean
156         Public ReadOnly Property uid As Integer
157             Get
158                 Return Me.GetHashCode
159             End Get
160         End Property
161
162         ''' <summary>
163         ''' 有一条线程单独执行这个任务队列
164         ''' </summary>
165         Private Sub __taskQueueEXEC()
166             Do While Not disposedValue
167                 If Not Tasks = 0 Then
168                     Dim task As __task
169
170                     SyncLock __tasks
171                         task = __tasks.Dequeue
172                     End SyncLock
173
174                     _RunningTask = True
175                     Call task.Run()
176                     If Not task.receiveDone Is Nothing Then
177                         Call task.receiveDone.Set()
178                     End If
179                     _RunningTask = False
180                 Else
181                     Call Thread.Sleep(1) ' 当前的线程处于空闲的状态
182                 End If
183             Loop
184         End Sub
185
186         Private Structure __task
187
188             Public callback As Action(Of T)
189             Public handle As Func(Of T)
190             Public receiveDone As ManualResetEvent
191
192             Public ReadOnly Property Value As T
193
194             Sub Run()
195                 Try
196                     _Value = handle()
197                 Catch ex As Exception
198                     Call App.LogException(ex)
199                     Call ex.PrintException
200                 End Try
201
202                 If Not callback Is Nothing Then
203                     Call callback(Value)
204                 End If
205             End Sub
206         End Structure
207
208 #Region "IDisposable Support"
209         Private disposedValue As Boolean To detect redundant calls
210
211         ' IDisposable
212         Protected Overridable Sub Dispose(disposing As Boolean)
213             If Not Me.disposedValue Then
214                 If disposing Then
215                     ' TODO: dispose managed state (managed objects).
216                     For Each x As __task In __tasks
217                         ' 释放所有的线程阻塞
218                         Call x.receiveDone.Set()
219                     Next
220                 End If
221
222                 ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below.
223                 ' TODO: set large fields to null.
224             End If
225             Me.disposedValue = True
226         End Sub
227
228         ' TODO: override Finalize() only if Dispose(disposing As Boolean) above has code to free unmanaged resources.
229         'Protected Overrides Sub Finalize()
230         '    Do not change this code.  Put cleanup code in Dispose(disposing As Boolean) above.
231         '    Dispose(False)
232         '    MyBase.Finalize()
233         'End Sub
234
235         ' This code added by Visual Basic to correctly implement the disposable pattern.
236         Public Sub Dispose() Implements IDisposable.Dispose
237             Do not change this code.  Put cleanup code in Dispose(disposing As Boolean) above.
238             Dispose(True)
239             ' TODO: uncomment the following line if Finalize() is overridden above.
240             ' GC.SuppressFinalize(Me)
241         End Sub
242 #End Region
243     End Class
244 End Namespace