1 #Region "Microsoft.VisualBasic::428f68cffb11bf82c69327e04c881853, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\ThreadPool.vb"
2
3     ' Author:
4     
5     '       asuka (amethyst.asuka@gcmodeller.org)
6     '       xie (genetics@smrucc.org)
7     '       xieguigang (xie.guigang@live.com)
8     
9     ' Copyright (c) 2018 GPL3 Licensed
10     
11     
12     ' GNU GENERAL PUBLIC LICENSE (GPL3)
13     
14     
15     ' This program is free software: you can redistribute it and/or modify
16     ' it under the terms of the GNU General Public License as published by
17     ' the Free Software Foundation, either version 3 of the License, or
18     ' (at your option) any later version.
19     
20     ' This program is distributed in the hope that it will be useful,
21     ' but WITHOUT ANY WARRANTY; without even the implied warranty of
22     ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23     ' GNU General Public License for more details.
24     
25     ' You should have received a copy of the GNU General Public License
26     ' along with this program. If not, see <http://www.gnu.org/licenses/>.
27
28
29
30     ' /********************************************************************************/
31
32     ' Summaries:
33
34     '     Class ThreadPool
35     
36     '         Properties: FullCapacity, NumOfThreads, ServerLoad, WorkingThreads
37     
38     '         Constructor: (+2 OverloadsSub New
39     
40     '         FunctionGetAvaliableThread, GetStatus, ToString
41     
42     '         Sub: __allocate, (+2 Overloads) Dispose, OperationTimeOut, RunTask
43     '         Structure __taskInvoke
44     
45     '             Function: Run
46     
47     
48     
49     
50     ' /********************************************************************************/
51
52 #End Region
53
54 Imports System.Runtime.CompilerServices
55 Imports System.Threading
56 Imports Microsoft.VisualBasic.Linq
57 Imports Microsoft.VisualBasic.Parallel.Linq
58 Imports Microsoft.VisualBasic.Parallel.Tasks
59 Imports Microsoft.VisualBasic.Serialization.JSON
60 Imports taskBind = Microsoft.VisualBasic.ComponentModel.Binding(Of System.Action, System.Action(Of Long))
61
62 Namespace Parallel.Threads
63
64     ''' <summary>
65     ''' 使用多条线程来执行任务队列,推荐在编写Web服务器的时候使用这个模块来执行任务
66     ''' </summary>
67     Public Class ThreadPool : Implements IDisposable
68
69         ReadOnly __threads As TaskQueue(Of Long)()
70         ''' <summary>
71         ''' 临时的句柄缓存
72         ''' </summary>
73         ReadOnly __pendings As New Queue(Of taskBind)(capacity:=10240)
74
75         ''' <summary>
76         ''' 线程池之中的线程数量
77         ''' </summary>
78         ''' <returns></returns>
79         Public ReadOnly Property NumOfThreads As Integer
80             <MethodImpl(MethodImplOptions.AggressiveInlining)>
81             Get
82                 Return __threads.Length
83             End Get
84         End Property
85
86         ''' <summary>
87         ''' 返回当前正在处于工作状态的线程数量
88         ''' </summary>
89         ''' <returns></returns>
90         Public ReadOnly Property WorkingThreads As Integer
91             Get
92                 Dim n As Integer
93
94                 For Each t In __threads
95                     If t.Tasks > 0 Then
96                         n += 1
97                     End If
98                 Next
99
100                 Return n
101             End Get
102         End Property
103
104         ''' <summary>
105         ''' Returns the server load.
106         ''' </summary>
107         ''' <returns></returns>
108         Public ReadOnly Property ServerLoad As Double
109             Get
110                 Dim works# = WorkingThreads / NumOfThreads
111                 Dim CPU_load# = Win32.TaskManager.ProcessUsage
112                 Dim load# = works * CPU_load
113
114                 Return load
115             End Get
116         End Property
117
118         ''' <summary>
119         ''' 是否所有的线程都是处于工作状态的
120         ''' </summary>
121         ''' <returns></returns>
122         Public ReadOnly Property FullCapacity As Boolean
123             <MethodImpl(MethodImplOptions.AggressiveInlining)>
124             Get
125                 Return WorkingThreads = __threads.Length
126             End Get
127         End Property
128
129         Sub New(maxThread As Integer)
130             __threads = New TaskQueue(Of Long)(maxThread) {}
131
132             For i As Integer = 0 To __threads.Length - 1
133                 __threads(i) = New TaskQueue(Of Long)
134             Next
135
136             Call ParallelExtension.RunTask(AddressOf __allocate)
137         End Sub
138
139         Sub New()
140             Me.New(LQuerySchedule.Recommended_NUM_THREADS)
141         End Sub
142
143         ''' <summary>
144         ''' 获取当前的这个线程池对象的状态的摘要信息
145         ''' </summary>
146         ''' <returns></returns>
147         Public Function GetStatus() As Dictionary(Of StringString)
148             Dim out As New Dictionary(Of StringString)
149
150             Call out.Add(NameOf(Me.FullCapacity), FullCapacity)
151             Call out.Add(NameOf(Me.NumOfThreads), NumOfThreads)
152             Call out.Add(NameOf(Me.WorkingThreads), WorkingThreads)
153             Call out.Add(NameOf(Me.__pendings), __pendings.Count)
154
155             For Each t As SeqValue(Of TaskQueue(Of Long)) In __threads.SeqIterator
156                 With (+t)
157                     Call out.Add("thread___" & t.i & "___" & .uid, .Tasks)
158                 End With
159             Next
160
161             Return out
162         End Function
163
164         ''' <summary>
165         ''' 使用线程池里面的空闲线程来执行任务
166         ''' </summary>
167         ''' <param name="task"></param>
168         ''' <param name="callback">回调函数里面的参数是任务的执行的时间长度</param>
169         Public Sub RunTask(task As Action, Optional callback As Action(Of Long) = Nothing)
170             Dim pends As New taskBind With {
171                 .Bind = task,
172                 .Target = callback
173             }
174             SyncLock __pendings
175                 Call __pendings.Enqueue(pends)
176             End SyncLock
177         End Sub
178
179         Public Sub OperationTimeOut(task As Action, timeout As Integer)
180             Dim done As Boolean = False
181
182             Call RunTask(task, Sub() done = True)
183
184             For i As Integer = 0 To timeout
185                 If done Then
186                     Exit For
187                 Else
188                     Thread.Sleep(1)
189                 End If
190             Next
191         End Sub
192
193         Private Sub __allocate()
194             Do While Not Me.disposedValue
195                 SyncLock __pendings
196                     If __pendings.Count > 0 Then
197                         Dim task As taskBind = __pendings.Dequeue
198                         Dim h As Func(Of Long) = AddressOf New __taskInvoke With {.task = task.Bind}.Run
199                         Dim callback As Action(Of Long) = task.Target
200                         Call GetAvaliableThread.Enqueue(h, callback)  ' 当线程池里面的线程数量非常多的时候,这个事件会变长,所以讲分配的代码单独放在线程里面执行,以提神web服务器的响应效率
201                     Else
202                         Call Thread.Sleep(1)
203                     End If
204                 End SyncLock
205             Loop
206         End Sub
207
208         Private Structure __taskInvoke
209             Dim task As Action
210
211             ''' <summary>
212             ''' 不清楚是不是因为lambda有问题,所以导致计时器没有正常的工作,所以在这里使用内部类来工作
213             ''' </summary>
214             ''' <returns></returns>
215             Public Function Run() As Long
216                 Dim time& = App.NanoTime
217                 Call task()
218                 Return App.NanoTime - time
219             End Function
220         End Structure
221
222         ''' <summary>
223         ''' 这个函数总是会返回一个线程对象的
224         ''' 
225         ''' + 当有空闲的线程,会返回第一个空闲的线程
226         ''' + 当没有空闲的线程,则会返回任务队列最短的线程
227         ''' </summary>
228         ''' <returns></returns>
229         Private Function GetAvaliableThread() As TaskQueue(Of Long)
230             Dim [short] As TaskQueue(Of Long) = __threads.First
231
232             For Each t In __threads
233                 If Not t.RunningTask Then
234                     Return t
235                 Else
236                     If [short].Tasks > t.Tasks Then
237                         [short] = t
238                     End If
239                 End If
240             Next
241
242             Return [short]
243         End Function
244
245         Public Overrides Function ToString() As String
246             Return __threads.GetJson
247         End Function
248
249 #Region "IDisposable Support"
250         Private disposedValue As Boolean To detect redundant calls
251
252         ' IDisposable
253         Protected Overridable Sub Dispose(disposing As Boolean)
254             If Not disposedValue Then
255                 If disposing Then
256                     ' TODO: dispose managed state (managed objects).
257                 End If
258
259                 ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below.
260                 ' TODO: set large fields to null.
261             End If
262             disposedValue = True
263         End Sub
264
265         ' TODO: override Finalize() only if Dispose(disposing As Boolean) above has code to free unmanaged resources.
266         'Protected Overrides Sub Finalize()
267         '    ' Do not change this code.  Put cleanup code in Dispose(disposing As Boolean) above.
268         '    Dispose(False)
269         '    MyBase.Finalize()
270         'End Sub
271
272         ' This code added by Visual Basic to correctly implement the disposable pattern.
273         Public Sub Dispose() Implements IDisposable.Dispose
274             Do not change this code.  Put cleanup code in Dispose(disposing As Boolean) above.
275             Dispose(True)
276             ' TODO: uncomment the following line if Finalize() is overridden above.
277             ' GC.SuppressFinalize(Me)
278         End Sub
279 #End Region
280     End Class
281 End Namespace