| 1 | #Region "Microsoft.VisualBasic::b9ff635fc0d543cb38bb41dcf070c378, Microsoft.VisualBasic.Core\ApplicationServices\Parallel\Threads\ThreadPool.vb" |
| 2 | |
| 3 | ' Author: |
| 4 | ' |
| 5 | ' asuka (amethyst.asuka@gcmodeller.org) |
| 6 | ' xie (genetics@smrucc.org) |
| 7 | ' xieguigang (xie.guigang@live.com) |
| 8 | ' |
| 9 | ' Copyright (c) 2018 GPL3 Licensed |
| 10 | ' |
| 11 | ' |
| 12 | ' GNU GENERAL PUBLIC LICENSE (GPL3) |
| 13 | ' |
| 14 | ' |
| 15 | ' This program is free software: you can redistribute it and/or modify |
| 16 | ' it under the terms of the GNU General Public License as published by |
| 17 | ' the Free Software Foundation, either version 3 of the License, or |
| 18 | ' (at your option) any later version. |
| 19 | ' |
| 20 | ' This program is distributed in the hope that it will be useful, |
| 21 | ' but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 22 | ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 23 | ' GNU General Public License for more details. |
| 24 | ' |
| 25 | ' You should have received a copy of the GNU General Public License |
| 26 | ' along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 27 | |
| 28 | |
| 29 | |
| 30 | ' /********************************************************************************/ |
| 31 | |
| 32 | ' Summaries: |
| 33 | |
| 34 | ' Class ThreadPool |
| 35 | ' |
| 36 | ' Properties: FullCapacity, NumOfThreads, ServerLoad, WorkingThreads |
| 37 | ' |
| 38 | ' Constructor: (+2 Overloads) Sub New |
| 39 | ' |
| 40 | ' Function: GetAvaliableThread, GetStatus, ToString |
| 41 | ' |
| 42 | ' Sub: __allocate, (+2 Overloads) Dispose, OperationTimeOut, RunTask |
| 43 | ' Structure __taskInvoke |
| 44 | ' |
| 45 | ' Function: Run |
| 46 | ' |
| 47 | ' |
| 48 | ' |
| 49 | ' |
| 50 | ' /********************************************************************************/ |
| 51 | |
| 52 | #End Region |
| 53 | |
| 54 | Imports System.Runtime.CompilerServices |
| 55 | Imports System.Threading |
| 56 | Imports Microsoft.VisualBasic.Linq |
| 57 | Imports Microsoft.VisualBasic.Parallel.Linq |
| 58 | Imports Microsoft.VisualBasic.Parallel.Tasks |
| 59 | Imports Microsoft.VisualBasic.Serialization.JSON |
| 60 | Imports taskBind = Microsoft.VisualBasic.ComponentModel.Binding(Of System.Action, System.Action(Of Long)) |
| 61 | |
| 62 | Namespace Parallel.Threads |
| 63 | |
| 64 | ''' <summary> |
| 65 | ''' 使用多条线程来执行任务队列,推荐在编写Web服务器的时候使用这个模块来执行任务 |
| 66 | ''' </summary> |
| 67 | Public Class ThreadPool : Implements IDisposable |
| 68 | |
| 69 | ReadOnly __threads As TaskQueue(Of Long)() |
| 70 | ''' <summary> |
| 71 | ''' 临时的句柄缓存 |
| 72 | ''' </summary> |
| 73 | ReadOnly __pendings As New Queue(Of taskBind)(capacity:=10240) |
| 74 | |
| 75 | ''' <summary> |
| 76 | ''' 线程池之中的线程数量 |
| 77 | ''' </summary> |
| 78 | ''' <returns></returns> |
| 79 | Public ReadOnly Property NumOfThreads As Integer |
| 80 | <MethodImpl(MethodImplOptions.AggressiveInlining)> |
| 81 | Get |
| 82 | Return __threads.Length |
| 83 | End Get |
| 84 | End Property |
| 85 | |
| 86 | ''' <summary> |
| 87 | ''' 返回当前正在处于工作状态的线程数量 |
| 88 | ''' </summary> |
| 89 | ''' <returns></returns> |
| 90 | Public ReadOnly Property WorkingThreads As Integer |
| 91 | Get |
| 92 | Dim n As Integer |
| 93 | |
| 94 | For Each t In __threads |
| 95 | If t.Tasks > 0 Then |
| 96 | n += 1 |
| 97 | End If |
| 98 | Next |
| 99 | |
| 100 | Return n |
| 101 | End Get |
| 102 | End Property |
| 103 | |
| 104 | ''' <summary> |
| 105 | ''' Returns the server load. |
| 106 | ''' </summary> |
| 107 | ''' <returns></returns> |
| 108 | Public ReadOnly Property ServerLoad As Double |
| 109 | Get |
| 110 | Dim works# = WorkingThreads / NumOfThreads |
| 111 | Dim CPU_load# = Win32.TaskManager.ProcessUsage |
| 112 | Dim load# = works * CPU_load |
| 113 | |
| 114 | Return load |
| 115 | End Get |
| 116 | End Property |
| 117 | |
| 118 | ''' <summary> |
| 119 | ''' 是否所有的线程都是处于工作状态的 |
| 120 | ''' </summary> |
| 121 | ''' <returns></returns> |
| 122 | Public ReadOnly Property FullCapacity As Boolean |
| 123 | <MethodImpl(MethodImplOptions.AggressiveInlining)> |
| 124 | Get |
| 125 | Return WorkingThreads = __threads.Length |
| 126 | End Get |
| 127 | End Property |
| 128 | |
| 129 | Sub New(maxThread As Integer) |
| 130 | __threads = New TaskQueue(Of Long)(maxThread) {} |
| 131 | |
| 132 | For i As Integer = 0 To __threads.Length - 1 |
| 133 | __threads(i) = New TaskQueue(Of Long) |
| 134 | Next |
| 135 | |
| 136 | Call ParallelExtension.RunTask(AddressOf __allocate) |
| 137 | End Sub |
| 138 | |
| 139 | Sub New() |
| 140 | Me.New(LQuerySchedule.Recommended_NUM_THREADS) |
| 141 | End Sub |
| 142 | |
| 143 | ''' <summary> |
| 144 | ''' 获取当前的这个线程池对象的状态的摘要信息 |
| 145 | ''' </summary> |
| 146 | ''' <returns></returns> |
| 147 | Public Function GetStatus() As Dictionary(Of String, String) |
| 148 | Dim out As New Dictionary(Of String, String) |
| 149 | |
| 150 | Call out.Add(NameOf(Me.FullCapacity), FullCapacity) |
| 151 | Call out.Add(NameOf(Me.NumOfThreads), NumOfThreads) |
| 152 | Call out.Add(NameOf(Me.WorkingThreads), WorkingThreads) |
| 153 | Call out.Add(NameOf(Me.__pendings), __pendings.Count) |
| 154 | |
| 155 | For Each t As SeqValue(Of TaskQueue(Of Long)) In __threads.SeqIterator |
| 156 | With (+t) |
| 157 | Call out.Add("thread___" & t.i & "___" & .uid, .Tasks) |
| 158 | End With |
| 159 | Next |
| 160 | |
| 161 | Return out |
| 162 | End Function |
| 163 | |
| 164 | ''' <summary> |
| 165 | ''' 使用线程池里面的空闲线程来执行任务 |
| 166 | ''' </summary> |
| 167 | ''' <param name="task"></param> |
| 168 | ''' <param name="callback">回调函数里面的参数是任务的执行的时间长度</param> |
| 169 | Public Sub RunTask(task As Action, Optional callback As Action(Of Long) = Nothing) |
| 170 | Dim pends As New taskBind With { |
| 171 | .Bind = task, |
| 172 | .Target = callback |
| 173 | } |
| 174 | SyncLock __pendings |
| 175 | Call __pendings.Enqueue(pends) |
| 176 | End SyncLock |
| 177 | End Sub |
| 178 | |
| 179 | Public Sub OperationTimeOut(task As Action, timeout As Integer) |
| 180 | Dim done As Boolean = False |
| 181 | |
| 182 | Call RunTask(task, Sub() done = True) |
| 183 | |
| 184 | For i As Integer = 0 To timeout |
| 185 | If done Then |
| 186 | Exit For |
| 187 | Else |
| 188 | Thread.Sleep(1) |
| 189 | End If |
| 190 | Next |
| 191 | End Sub |
| 192 | |
| 193 | Private Sub __allocate() |
| 194 | Do While Not Me.disposedValue |
| 195 | SyncLock __pendings |
| 196 | If __pendings.Count > 0 Then |
| 197 | Dim task As taskBind = __pendings.Dequeue |
| 198 | Dim h As Func(Of Long) = AddressOf New __taskInvoke With {.task = task.Bind}.Run |
| 199 | Dim callback As Action(Of Long) = task.Target |
| 200 | Call GetAvaliableThread.Enqueue(h, callback) ' 当线程池里面的线程数量非常多的时候,这个事件会变长,所以讲分配的代码单独放在线程里面执行,以提神web服务器的响应效率 |
| 201 | Else |
| 202 | Call Thread.Sleep(1) |
| 203 | End If |
| 204 | End SyncLock |
| 205 | Loop |
| 206 | End Sub |
| 207 | |
| 208 | Private Structure __taskInvoke |
| 209 | Dim task As Action |
| 210 | |
| 211 | ''' <summary> |
| 212 | ''' 不清楚是不是因为lambda有问题,所以导致计时器没有正常的工作,所以在这里使用内部类来工作 |
| 213 | ''' </summary> |
| 214 | ''' <returns></returns> |
| 215 | Public Function Run() As Long |
| 216 | Dim time& = App.NanoTime |
| 217 | Call task() |
| 218 | Return App.NanoTime - time |
| 219 | End Function |
| 220 | End Structure |
| 221 | |
| 222 | ''' <summary> |
| 223 | ''' 这个函数总是会返回一个线程对象的 |
| 224 | ''' |
| 225 | ''' + 当有空闲的线程,会返回第一个空闲的线程 |
| 226 | ''' + 当没有空闲的线程,则会返回任务队列最短的线程 |
| 227 | ''' </summary> |
| 228 | ''' <returns></returns> |
| 229 | Private Function GetAvaliableThread() As TaskQueue(Of Long) |
| 230 | Dim [short] As TaskQueue(Of Long) = __threads.First |
| 231 | |
| 232 | For Each t In __threads |
| 233 | If Not t.RunningTask Then |
| 234 | Return t |
| 235 | Else |
| 236 | If [short].Tasks > t.Tasks Then |
| 237 | [short] = t |
| 238 | End If |
| 239 | End If |
| 240 | Next |
| 241 | |
| 242 | Return [short] |
| 243 | End Function |
| 244 | |
| 245 | Public Overrides Function ToString() As String |
| 246 | Return __threads.GetJson |
| 247 | End Function |
| 248 | |
| 249 | #Region "IDisposable Support" |
| 250 | Private disposedValue As Boolean ' To detect redundant calls |
| 251 | |
| 252 | ' IDisposable |
| 253 | Protected Overridable Sub Dispose(disposing As Boolean) |
| 254 | If Not disposedValue Then |
| 255 | If disposing Then |
| 256 | ' TODO: dispose managed state (managed objects). |
| 257 | End If |
| 258 | |
| 259 | ' TODO: free unmanaged resources (unmanaged objects) and override Finalize() below. |
| 260 | ' TODO: set large fields to null. |
| 261 | End If |
| 262 | disposedValue = True |
| 263 | End Sub |
| 264 | |
| 265 | ' TODO: override Finalize() only if Dispose(disposing As Boolean) above has code to free unmanaged resources. |
| 266 | 'Protected Overrides Sub Finalize() |
| 267 | ' ' Do not change this code. Put cleanup code in Dispose(disposing As Boolean) above. |
| 268 | ' Dispose(False) |
| 269 | ' MyBase.Finalize() |
| 270 | 'End Sub |
| 271 | |
| 272 | ' This code added by Visual Basic to correctly implement the disposable pattern. |
| 273 | Public Sub Dispose() Implements IDisposable.Dispose |
| 274 | ' Do not change this code. Put cleanup code in Dispose(disposing As Boolean) above. |
| 275 | Dispose(True) |
| 276 | ' TODO: uncomment the following line if Finalize() is overridden above. |
| 277 | ' GC.SuppressFinalize(Me) |
| 278 | End Sub |
| 279 | #End Region |
| 280 | End Class |
| 281 | End Namespace |