Skip to content

Instantly share code, notes, and snippets.

@rajibchy
Last active June 5, 2021 16:08
Show Gist options
  • Select an option

  • Save rajibchy/b1d1bdcd4a83d4064db08556e7a5d120 to your computer and use it in GitHub Desktop.

Select an option

Save rajibchy/b1d1bdcd4a83d4064db08556e7a5d120 to your computer and use it in GitHub Desktop.
SemaphoreSlim with ConcurrentQueue
/// <![CDATA[copyright]]>
/// Copyright (c) 2018, Sow ( https://safeonline.world, https://www.facebook.com/safeonlineworld). (https://github.com/RKTUXYN) All rights reserved.
/// Copyrights licensed under the New BSD License.
/// See the accompanying LICENSE file for terms.
/// <![CDATA[copyright]]>
/// <![CDATA[Author]]>
/// By Rajib Chy
/// On 5/26/2021 1:19:21 PM
/// <![CDATA[Author]]>
using System;
using System.Linq;
using System.Threading;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Sow.Framework {
public class SharedQueue<TArgument, TResult> : ISharedQueue<TArgument, TResult> {
private bool _isRunning = false;
private bool _isDisposed = false;
private int _door;
/** Thrad Capacity */
private int _roomCapacity;
private int _maxCapacity;
public int Door {
get { return _door; }
set {
if ( _isRunning ) {
throw new Exception( "Application alrady running. Please stop, then try." );
}
_door = value;
_maxCapacity = _roomCapacity * _door;
}
}
public int RoomCapacity {
get { return _roomCapacity; }
set {
if ( _isRunning ) {
throw new Exception( "Application alrady running. Please stop, then try." );
}
_roomCapacity = value;
_maxCapacity = _roomCapacity * _door;
}
}
private ConcurrentQueue<TArgument> _queue;
private readonly SemaphoreSlim _doorAvailable;
private readonly CancellationToken _cancellationToken;
private List<Thread> _threads { get; set; }
public Func<CancellationTokenSource, TArgument, Task<TResult>> OnDequeue { get; set; }
public Action<TArgument, TResult> OnSuccess { get; set; }
public Action<TArgument, OverflowException> OnOverflow { get; set; }
public Action<TArgument, Exception> OnError { get; set; }
private CancellationTokenSource _pendingRequestsCts;
public SharedQueue(
CancellationToken cancellationToken,
int door = 1, int roomCapacity = 20
) {
_pendingRequestsCts = new CancellationTokenSource( );
_roomCapacity = roomCapacity; _door = door;
_maxCapacity = _roomCapacity * _door;
_queue = new ConcurrentQueue<TArgument>( );
_doorAvailable = new SemaphoreSlim( door );
_threads = new List<Thread>( door );
_cancellationToken = cancellationToken;
}
public bool TryEnqueue( TArgument args ) {
if ( _isDisposed ) return false;
if ( _queue.Count( ) > _maxCapacity ) {
OnOverflow.Invoke( args, new OverflowException( $"Queue Overflow. Max room capacity {_maxCapacity}" ) );
return false;
}
_queue.Enqueue( args );
return true;
}
private void TryDequeue( ) {
Thread worker = new Thread( async ( ) => {
do {
await _doorAvailable.WaitAsync( _cancellationToken );
if ( _cancellationToken.IsCancellationRequested ) break;
TArgument result = default;
try {
if ( _queue.TryDequeue( out result ) ) {
CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( _cancellationToken, _pendingRequestsCts.Token );
TResult resp = await OnDequeue.Invoke( linkedTokenSource, result );
OnSuccess.Invoke( result, resp );
}
} catch ( Exception e ) {
OnError.Invoke( result, e );
} finally {
_doorAvailable.Release( );
}
} while ( !_cancellationToken.IsCancellationRequested );
} );
_threads.Add( worker );
worker.SetApartmentState( ApartmentState.STA );
worker.Start( );
}
public List<TArgument> Flush( ) {
if ( _isDisposed ) return null;
if ( _isRunning ) Stop( );
if ( _queue.Count == 0 ) return null;
ConcurrentQueue<TArgument> pendingQueue = ThreadSafe.Exchange(ref _queue, new ConcurrentQueue<TArgument>( ) );
List<TArgument> result = new List<TArgument>( pendingQueue.Count );
foreach ( TArgument item in pendingQueue ) {
result.Add( item );
}
while ( pendingQueue.TryDequeue( out TArgument _ ) ) ;
return result;
}
private void ReleaseAll( ) {
while ( _queue.TryDequeue( out TArgument _ ) ) ;
_ = ThreadSafe.Exchange(ref _queue, new ConcurrentQueue<TArgument>( ) );
}
public void Start( ) {
if ( _isDisposed ) {
throw new ObjectDisposedException( "The object `SharedQueue` already disposed. You can not start again." );
}
if ( OnDequeue == null || OnSuccess == null || OnError == null || OnOverflow ==null ) {
throw new NullReferenceException( "Make sure you atached the following event(s) `OnDequeue`, `OnSuccess`, `OnError` and `OnOverflow`." );
}
if ( _isRunning ) {
Stop( );
}
_isRunning = true;
for ( int i = 0; i < _door; i++ ) {
TryDequeue( );
}
}
public void Stop( bool exit = false ) {
if ( !_isRunning || _threads == null ) return;
var ths = _threads;
List<Thread> threads = ThreadSafe.Exchange(ref ths, exit ? null : new List<Thread>( ) );
foreach ( Thread t in threads ) {
ThreadSafe.ExitThread( t );
}
threads.Clear( );
}
public void CancelPendingRequests( ) {
//var pendingRequestsCts = new CancellationTokenSource( );
// With every request we link this cancellation token source.
CancellationTokenSource currentCts = ThreadSafe.Exchange(ref _pendingRequestsCts, new CancellationTokenSource( ) );
currentCts.Cancel( );
currentCts.Dispose( );
}
public void Dispose( ) {
if ( _isDisposed ) return;
_isDisposed = true;
ReleaseAll( );
// Cancel all pending requests (if any). Note that we don't call CancelPendingRequests() but cancel
// the CTS directly. The reason is that CancelPendingRequests() would cancel the current CTS and create
// a new CTS. We don't want a new CTS in this case.
_pendingRequestsCts.Cancel( );
_pendingRequestsCts.Dispose( );
this.Stop( true );
GC.Collect( ); GC.SuppressFinalize( this );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment