1 | #Region "Microsoft.VisualBasic::b9ff635fc0d543cb38bb41dcf070c378, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\ThreadPool.vb" |
2 | |
3 | ' Author: |
4 | ' |
5 | ' asuka (amethyst.asuka@gcmodeller.org) |
6 | ' xie (genetics@smrucc.org) |
7 | ' xieguigang (xie.guigang@live.com) |
8 | ' |
9 | ' Copyright (c) 2018 GPL3 Licensed |
10 | ' |
11 | ' |
12 | ' GNU GENERAL PUBLIC LICENSE (GPL3) |
13 | ' |
14 | ' |
15 | ' This program is free software: you can redistribute it and/or modify |
16 | ' it under the terms of the GNU General Public License as published by |
17 | ' the Free Software Foundation, either version 3 of the License, or |
18 | ' (at your option) any later version. |
19 | ' |
20 | ' This program is distributed in the hope that it will be useful, |
21 | ' but WITHOUT ANY WARRANTY; without even the implied warranty of |
22 | ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
23 | ' GNU General Public License for more details. |
24 | ' |
25 | ' You should have received a copy of the GNU General Public License |
26 | ' along with this program. If not, see <http://www.gnu.org/licenses/>. |
27 | |
28 | |
29 | |
30 | ' /********************************************************************************/ |
31 | |
32 | ' Summaries: |
33 | |
34 | ' Class ThreadPool |
35 | ' |
36 | ' Properties: FullCapacity, NumOfThreads, ServerLoad, WorkingThreads |
37 | ' |
38 | ' Constructor: (+2 Overloads) Sub New |
39 | ' |
40 | ' Function: GetAvaliableThread, GetStatus, ToString |
41 | ' |
42 | ' Sub: __allocate, (+2 Overloads) Dispose, OperationTimeOut, RunTask |
43 | ' Structure __taskInvoke |
44 | ' |
45 | ' Function: Run |
46 | ' |
47 | ' |
48 | ' |
49 | ' |
50 | ' /********************************************************************************/ |
51 | |
52 | #End Region |
53 | |
54 | Imports System.Runtime.CompilerServices |
55 | Imports System.Threading |
56 | Imports Microsoft.VisualBasic.Linq |
57 | Imports Microsoft.VisualBasic.Parallel.Linq |
58 | Imports Microsoft.VisualBasic.Parallel.Tasks |
59 | Imports Microsoft.VisualBasic.Serialization.JSON |
60 | Imports taskBind = Microsoft.VisualBasic.ComponentModel.Binding(Of System.Action, System.Action(Of Long)) |
61 | |
62 | Namespace Parallel.Threads |
63 | |
64 | ''' <summary> |
65 | ''' 使用多条线程来执行任务队列,推荐在编写Web服务器的时候使用这个模块来执行任务 |
66 | ''' </summary> |
67 | Public Class ThreadPool : Implements IDisposable |
68 | |
69 | ReadOnly __threads As TaskQueue(Of Long)() |
70 | ''' <summary> |
71 | ''' 临时的句柄缓存 |
72 | ''' </summary> |
73 | ReadOnly __pendings As New Queue(Of taskBind)(capacity:=10240) |
74 | |
75 | ''' <summary> |
76 | ''' 线程池之中的线程数量 |
77 | ''' </summary> |
78 | ''' <returns></returns> |
79 | Public ReadOnly Property NumOfThreads As Integer |
80 | <MethodImpl(MethodImplOptions.AggressiveInlining)> |
81 | Get |
82 | Return __threads.Length |
83 | End Get |
84 | End Property |
85 | |
86 | ''' <summary> |
87 | ''' 返回当前正在处于工作状态的线程数量 |
88 | ''' </summary> |
89 | ''' <returns></returns> |
90 | Public ReadOnly Property WorkingThreads As Integer |
91 | Get |
92 | Dim n As Integer |
93 | |
94 | For Each t In __threads |
95 | If t.Tasks > 0 Then |
96 | n += 1 |
97 | End If |
98 | Next |
99 | |
100 | Return n |
101 | End Get |
102 | End Property |
103 | |
104 | ''' <summary> |
105 | ''' Returns the server load. |
106 | ''' </summary> |
107 | ''' <returns></returns> |
108 | Public ReadOnly Property ServerLoad As Double |
109 | Get |
110 | Dim works# = WorkingThreads / NumOfThreads |
111 | Dim CPU_load# = Win32.TaskManager.ProcessUsage |
112 | Dim load# = works * CPU_load |
113 | |
114 | Return load |
115 | End Get |
116 | End Property |
117 | |
118 | ''' <summary> |
119 | ''' 是否所有的线程都是处于工作状态的 |
120 | ''' </summary> |
121 | ''' <returns></returns> |
122 | Public ReadOnly Property FullCapacity As Boolean |
123 | <MethodImpl(MethodImplOptions.AggressiveInlining)> |
124 | Get |
125 | Return WorkingThreads = __threads.Length |
126 | End Get |
127 | End Property |
128 | |
129 | Sub New(maxThread As Integer) |
130 | __threads = New TaskQueue(Of Long)(maxThread) {} |
131 | |
132 | For i As Integer = 0 To __threads.Length - 1 |
133 | __threads(i) = New TaskQueue(Of Long) |
134 | Next |
135 | |
136 | Call ParallelExtension.RunTask(AddressOf __allocate) |
137 | End Sub |
138 | |
139 | Sub New() |
140 | Me.New(LQuerySchedule.Recommended_NUM_THREADS) |
141 | End Sub |
142 | |
143 | ''' <summary> |
144 | ''' 获取当前的这个线程池对象的状态的摘要信息 |
145 | ''' </summary> |
146 | ''' <returns></returns> |
147 | Public Function GetStatus() As Dictionary(Of String, String) |
148 | Dim out As New Dictionary(Of String, String) |
149 | |
150 | Call out.Add(NameOf(Me.FullCapacity), FullCapacity) |
151 | Call out.Add(NameOf(Me.NumOfThreads), NumOfThreads) |
152 | Call out.Add(NameOf(Me.WorkingThreads), WorkingThreads) |
153 | Call out.Add(NameOf(Me.__pendings), __pendings.Count) |
154 | |
155 | For Each t As SeqValue(Of TaskQueue(Of Long)) In __threads.SeqIterator |
156 | With (+t) |
157 | Call out.Add("thread___" & t.i & "___" & .uid, .Tasks) |
158 | End With |
159 | Next |
160 | |
161 | Return out |
162 | End Function |
163 | |
164 | ''' <summary> |
165 | ''' 使用线程池里面的空闲线程来执行任务 |
166 | ''' </summary> |
167 | ''' <param name="task"></param> |
168 | ''' <param name="callback">回调函数里面的参数是任务的执行的时间长度</param> |
169 | Public Sub RunTask(task As Action, Optional callback As Action(Of Long) = Nothing) |
170 | Dim pends As New taskBind With { |
171 | .Bind = task, |
172 | .Target = callback |
173 | } |
174 | SyncLock __pendings |
175 | Call __pendings.Enqueue(pends) |
176 | End SyncLock |
177 | End Sub |
178 | |
179 | Public Sub OperationTimeOut(task As Action, timeout As Integer) |
180 | Dim done As Boolean = False |
181 | |
182 | Call RunTask(task, Sub() done = True) |
183 | |
184 | For i As Integer = 0 To timeout |
185 | If done Then |
186 | Exit For |
187 | Else |
188 | Thread.Sleep(1) |
189 | End If |
190 | Next |
191 | End Sub |
192 | |
193 | Private Sub __allocate() |
194 | Do While Not Me.disposedValue |
195 | SyncLock __pendings |
196 | If __pendings.Count > 0 Then |
197 | Dim task As taskBind = __pendings.Dequeue |
198 | Dim h As Func(Of Long) = AddressOf New __taskInvoke With {.task = task.Bind}.Run |
199 | Dim callback As Action(Of Long) = task.Target |
200 | Call GetAvaliableThread.Enqueue(h, callback) ' 当线程池里面的线程数量非常多的时候,这个事件会变长,所以讲分配的代码单独放在线程里面执行,以提神web服务器的响应效率 |
201 | Else |
202 | Call Thread.Sleep(1) |
203 | End If |
204 | End SyncLock |
205 | Loop |
206 | End Sub |
207 | |
208 | Private Structure __taskInvoke |
209 | Dim task As Action |
210 | |
211 | ''' <summary> |
212 | ''' 不清楚是不是因为lambda有问题,所以导致计时器没有正常的工作,所以在这里使用内部类来工作 |
213 | ''' </summary> |
214 | ''' <returns></returns> |
215 | Public Function Run() As Long |
216 | Dim time& = App.NanoTime |
217 | Call task() |
218 | Return App.NanoTime - time |
219 | End Function |
220 | End Structure |
221 | |
222 | ''' <summary> |
223 | ''' 这个函数总是会返回一个线程对象的 |
224 | ''' |
225 | ''' + 当有空闲的线程,会返回第一个空闲的线程 |
226 | ''' + 当没有空闲的线程,则会返回任务队列最短的线程 |
227 | ''' </summary> |
228 | ''' <returns></returns> |
229 | Private Function GetAvaliableThread() As TaskQueue(Of Long) |
230 | Dim [short] As TaskQueue(Of Long) = __threads.First |
231 | |
232 | For Each t In __threads |
233 | If Not t.RunningTask Then |
234 | Return t |
235 | Else |
236 | If [short].Tasks > t.Tasks Then |
237 | [short] = t |
238 | End If |
239 | End If |
240 | Next |
241 | |
242 | Return [short] |
243 | End Function |
244 | |
245 | Public Overrides Function ToString() As String |
246 | Return __threads.GetJson |
247 | End Function |
248 | |
249 | #Region "IDisposable Support" |
250 | Private disposedValue As Boolean ' To detect redundant calls |
251 | |
252 | ' IDisposable |
253 | Protected Overridable Sub Dispose(disposing As Boolean) |
254 | If Not disposedValue Then |
255 | If disposing Then |
256 | ' TODO: dispose managed state (managed objects). |
257 | End If |
258 | |
259 | ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below. |
260 | ' TODO: set large fields to null. |
261 | End If |
262 | disposedValue = True |
263 | End Sub |
264 | |
265 | ' TODO: override Finalize() only if Dispose(disposing As Boolean) above has code to free unmanaged resources. |
266 | 'Protected Overrides Sub Finalize() |
267 | ' ' Do not change this code. Put cleanup code in Dispose(disposing As Boolean) above. |
268 | ' Dispose(False) |
269 | ' MyBase.Finalize() |
270 | 'End Sub |
271 | |
272 | ' This code added by Visual Basic to correctly implement the disposable pattern. |
273 | Public Sub Dispose() Implements IDisposable.Dispose |
274 | ' Do not change this code. Put cleanup code in Dispose(disposing As Boolean) above. |
275 | Dispose(True) |
276 | ' TODO: uncomment the following line if Finalize() is overridden above. |
277 | ' GC.SuppressFinalize(Me) |
278 | End Sub |
279 | #End Region |
280 | End Class |
281 | End Namespace |