Instantly share code, notes, and snippets.
Last active
June 5, 2021 16:08
-
Star
1
(1)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
-
Save rajibchy/b1d1bdcd4a83d4064db08556e7a5d120 to your computer and use it in GitHub Desktop.
SemaphoreSlim with ConcurrentQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /// <![CDATA[copyright]]> | |
| /// Copyright (c) 2018, Sow ( https://safeonline.world, https://www.facebook.com/safeonlineworld). (https://github.com/RKTUXYN) All rights reserved. | |
| /// Copyrights licensed under the New BSD License. | |
| /// See the accompanying LICENSE file for terms. | |
| /// <![CDATA[copyright]]> | |
| /// <![CDATA[Author]]> | |
| /// By Rajib Chy | |
| /// On 5/26/2021 1:19:21 PM | |
| /// <![CDATA[Author]]> | |
| using System; | |
| using System.Linq; | |
| using System.Threading; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Threading.Tasks; | |
| namespace Sow.Framework { | |
| public class SharedQueue<TArgument, TResult> : ISharedQueue<TArgument, TResult> { | |
| private bool _isRunning = false; | |
| private bool _isDisposed = false; | |
| private int _door; | |
| /** Thrad Capacity */ | |
| private int _roomCapacity; | |
| private int _maxCapacity; | |
| public int Door { | |
| get { return _door; } | |
| set { | |
| if ( _isRunning ) { | |
| throw new Exception( "Application alrady running. Please stop, then try." ); | |
| } | |
| _door = value; | |
| _maxCapacity = _roomCapacity * _door; | |
| } | |
| } | |
| public int RoomCapacity { | |
| get { return _roomCapacity; } | |
| set { | |
| if ( _isRunning ) { | |
| throw new Exception( "Application alrady running. Please stop, then try." ); | |
| } | |
| _roomCapacity = value; | |
| _maxCapacity = _roomCapacity * _door; | |
| } | |
| } | |
| private ConcurrentQueue<TArgument> _queue; | |
| private readonly SemaphoreSlim _doorAvailable; | |
| private readonly CancellationToken _cancellationToken; | |
| private List<Thread> _threads { get; set; } | |
| public Func<CancellationTokenSource, TArgument, Task<TResult>> OnDequeue { get; set; } | |
| public Action<TArgument, TResult> OnSuccess { get; set; } | |
| public Action<TArgument, OverflowException> OnOverflow { get; set; } | |
| public Action<TArgument, Exception> OnError { get; set; } | |
| private CancellationTokenSource _pendingRequestsCts; | |
| public SharedQueue( | |
| CancellationToken cancellationToken, | |
| int door = 1, int roomCapacity = 20 | |
| ) { | |
| _pendingRequestsCts = new CancellationTokenSource( ); | |
| _roomCapacity = roomCapacity; _door = door; | |
| _maxCapacity = _roomCapacity * _door; | |
| _queue = new ConcurrentQueue<TArgument>( ); | |
| _doorAvailable = new SemaphoreSlim( door ); | |
| _threads = new List<Thread>( door ); | |
| _cancellationToken = cancellationToken; | |
| } | |
| public bool TryEnqueue( TArgument args ) { | |
| if ( _isDisposed ) return false; | |
| if ( _queue.Count( ) > _maxCapacity ) { | |
| OnOverflow.Invoke( args, new OverflowException( $"Queue Overflow. Max room capacity {_maxCapacity}" ) ); | |
| return false; | |
| } | |
| _queue.Enqueue( args ); | |
| return true; | |
| } | |
| private void TryDequeue( ) { | |
| Thread worker = new Thread( async ( ) => { | |
| do { | |
| await _doorAvailable.WaitAsync( _cancellationToken ); | |
| if ( _cancellationToken.IsCancellationRequested ) break; | |
| TArgument result = default; | |
| try { | |
| if ( _queue.TryDequeue( out result ) ) { | |
| CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( _cancellationToken, _pendingRequestsCts.Token ); | |
| TResult resp = await OnDequeue.Invoke( linkedTokenSource, result ); | |
| OnSuccess.Invoke( result, resp ); | |
| } | |
| } catch ( Exception e ) { | |
| OnError.Invoke( result, e ); | |
| } finally { | |
| _doorAvailable.Release( ); | |
| } | |
| } while ( !_cancellationToken.IsCancellationRequested ); | |
| } ); | |
| _threads.Add( worker ); | |
| worker.SetApartmentState( ApartmentState.STA ); | |
| worker.Start( ); | |
| } | |
| public List<TArgument> Flush( ) { | |
| if ( _isDisposed ) return null; | |
| if ( _isRunning ) Stop( ); | |
| if ( _queue.Count == 0 ) return null; | |
| ConcurrentQueue<TArgument> pendingQueue = ThreadSafe.Exchange(ref _queue, new ConcurrentQueue<TArgument>( ) ); | |
| List<TArgument> result = new List<TArgument>( pendingQueue.Count ); | |
| foreach ( TArgument item in pendingQueue ) { | |
| result.Add( item ); | |
| } | |
| while ( pendingQueue.TryDequeue( out TArgument _ ) ) ; | |
| return result; | |
| } | |
| private void ReleaseAll( ) { | |
| while ( _queue.TryDequeue( out TArgument _ ) ) ; | |
| _ = ThreadSafe.Exchange(ref _queue, new ConcurrentQueue<TArgument>( ) ); | |
| } | |
| public void Start( ) { | |
| if ( _isDisposed ) { | |
| throw new ObjectDisposedException( "The object `SharedQueue` already disposed. You can not start again." ); | |
| } | |
| if ( OnDequeue == null || OnSuccess == null || OnError == null || OnOverflow ==null ) { | |
| throw new NullReferenceException( "Make sure you atached the following event(s) `OnDequeue`, `OnSuccess`, `OnError` and `OnOverflow`." ); | |
| } | |
| if ( _isRunning ) { | |
| Stop( ); | |
| } | |
| _isRunning = true; | |
| for ( int i = 0; i < _door; i++ ) { | |
| TryDequeue( ); | |
| } | |
| } | |
| public void Stop( bool exit = false ) { | |
| if ( !_isRunning || _threads == null ) return; | |
| var ths = _threads; | |
| List<Thread> threads = ThreadSafe.Exchange(ref ths, exit ? null : new List<Thread>( ) ); | |
| foreach ( Thread t in threads ) { | |
| ThreadSafe.ExitThread( t ); | |
| } | |
| threads.Clear( ); | |
| } | |
| public void CancelPendingRequests( ) { | |
| //var pendingRequestsCts = new CancellationTokenSource( ); | |
| // With every request we link this cancellation token source. | |
| CancellationTokenSource currentCts = ThreadSafe.Exchange(ref _pendingRequestsCts, new CancellationTokenSource( ) ); | |
| currentCts.Cancel( ); | |
| currentCts.Dispose( ); | |
| } | |
| public void Dispose( ) { | |
| if ( _isDisposed ) return; | |
| _isDisposed = true; | |
| ReleaseAll( ); | |
| // Cancel all pending requests (if any). Note that we don't call CancelPendingRequests() but cancel | |
| // the CTS directly. The reason is that CancelPendingRequests() would cancel the current CTS and create | |
| // a new CTS. We don't want a new CTS in this case. | |
| _pendingRequestsCts.Cancel( ); | |
| _pendingRequestsCts.Dispose( ); | |
| this.Stop( true ); | |
| GC.Collect( ); GC.SuppressFinalize( this ); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment