Skip to content

Instantly share code, notes, and snippets.

@rajibchy
Last active July 30, 2025 18:00
Show Gist options
  • Select an option

  • Save rajibchy/1c40613137a4334c5a7ad7e8740d56df to your computer and use it in GitHub Desktop.

Select an option

Save rajibchy/1c40613137a4334c5a7ad7e8740d56df to your computer and use it in GitHub Desktop.
A high-performance, thread-safe message queue using `System.Threading.Channels.Channel`
// Copyright FSys Tech Limited [FSys]. All rights reserved.
//
// This software owned by FSys Tech Limited [FSys] and is protected by copyright law
// and international copyright treaties.
//
// Access to and use of the software is governed by the terms of the applicable FSys Software
// Services Agreement (the Agreement) and Customer end user license agreements granting
// a non-assignable, non-transferable and non-exclusive license to use the software
// for it's own data processing purposes under the terms defined in the Agreement.
//
// Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part
// of this source code or associated reference material to any other location for further reproduction
// or redistribution, and any amendments to this copyright notice, are expressly prohibited.
//
// Any reproduction or redistribution for sale or hiring of the Software not in accordance with
// the terms of the Agreement is a violation of copyright law.
// 11:44 PM 7/29/2025
// by rajib chy
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
namespace FSys.Framework.Concurrent {
/// <summary>
/// Represents a delegate for handling queued channel messages.
/// </summary>
/// <typeparam name="TEventArgs">The type of the event data passed to the delegate.</typeparam>
/// <param name="e">The event data.</param>
public delegate void SyncQueueEventHandler<TEventArgs>( TEventArgs e );
/// <summary>
/// Represents an asynchronous method that handles a queue event with a parameter of type <typeparamref name="TEventArgs"/>.
/// </summary>
/// <typeparam name="TEventArgs">The type of the event data.</typeparam>
/// <param name="e">The event data passed to the handler.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public delegate Task AsyncQueueEventHandler<TEventArgs>( TEventArgs e );
public delegate Task SyncQueueStartEventHandler( );
/// <summary>
/// Represents a generic interface for a channel-based message queue.
/// </summary>
/// <typeparam name="T">The type of messages handled by the queue.</typeparam>
public interface ISyncQueue<T> : IDisposable {
/// <summary>
/// Gets the <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/>
/// </summary>
/// <returns>
/// The <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/>
/// </returns>
/// <exception cref="System.ObjectDisposedException">
/// The token source has been disposed.
/// </exception>
CancellationToken Token { get; }
/// <summary>
/// Starts the internal worker thread that processes queued messages.
/// </summary>
void Start( );
/// <summary>
/// Stops the queue from processing further messages and initiates a graceful shutdown.
/// </summary>
/// <remarks>
/// After calling this method, the queue should stop accepting or dequeuing new items.
/// Any ongoing operations may complete, but no further processing will occur.
/// This is typically used to shut down the queue safely before disposal.
/// </remarks>
void Stop( );
/// <summary>
/// Gets a value indicating whether the queue has been disposed.
/// </summary>
bool IsDisposed { get; }
/// <summary>
/// Enqueues a <typeparamref name="T"/> message into the queue immediately without retries.
/// </summary>
/// <param name="message">The message to enqueue.</param>
void Enqueue( T message );
/// <summary>
/// Attempts to enqueue a <typeparamref name="T"/> message synchronously into the queue.
/// </summary>
/// <param name="message">The message to enqueue.</param>
void TryWrite( T message );
/// <summary>
/// Attempts to enqueue a <typeparamref name="T"/> message asynchronously into the queue.
/// </summary>
/// <param name="message">The message to enqueue.</param>
/// <returns>
/// A task that represents the asynchronous enqueue operation. The result indicates whether the message was successfully enqueued.
/// </returns>
Task<bool> TryWriteAsync( T message );
/// <summary>
/// Attempts to enqueue a <typeparamref name="T"/> into the message queue with high throughput and fallback safety.
/// </summary>
/// <param name="message">The message item to enqueue.</param>
/// <param name="retries">
/// The number of times to retry the asynchronous write if the queue is full.
/// Defaults to 3 retries with exponential backoff on failure.
/// </param>
/// <returns>
/// A task that resolves to <c>true</c> if the item was successfully enqueued; <c>false</c> if all retry attempts failed.
/// </returns>
/// <remarks>
/// This method first attempts a fast, non-blocking write using <c>TryWrite</c>. If the queue is full,
/// it falls back to <c>WriteAsync</c> with exponential backoff and retry logic.
/// It balances performance with safety by minimizing overhead in the common case and ensuring
/// message delivery under transient load.
/// </remarks>
Task<bool> EnqueueAsync( T message, int retries = 3 );
}
/// <summary>
/// A high-performance, thread-safe message queue using <see cref="System.Threading.Channels.Channel{T}"/>
/// with retry and cancellation support.
/// </summary>
/// <typeparam name="T">The type of message being handled in the queue.</typeparam>
public class SyncQueue<T> : TokenBase, ISyncQueue<T> {
private long _isDisposed = 0;
private Thread _worker = null;
private readonly string _prefex;
private readonly Channel<T> _queue;
private readonly ILoggerQueue _logger;
/// <summary>
/// Occurs when a new message is dequeued and ready for processing.
/// </summary>
/// <remarks>
/// This event is triggered synchronously each time an item of type <typeparamref name="T"/>
/// is removed from the internal queue. Use this to handle or process messages as they arrive.
/// Ensure handlers execute quickly to avoid blocking the queue.
/// </remarks>
public event SyncQueueEventHandler<T> OnMessage;
/// <summary>
/// Raised when the synchronous queue begins operation or is activated.
/// This event can be used to initialize resources or log start-time actions.
/// </summary>
/// <remarks>
/// The <see cref="SyncQueueStartEventHandler"/> delegate provides the sender object and event arguments.
/// </remarks>
public event SyncQueueStartEventHandler OnStart;
/// <summary>
/// Occurs when an item is dequeued and <c>isAsync</c> is set to <c>true</c>.
/// </summary>
/// <remarks>
/// Handlers attached to this event are awaited, ensuring sequential asynchronous processing of each item.
/// This event is mutually exclusive with <see cref="OnMessage"/>; only one will be used based on the queue mode.
/// </remarks>
public event AsyncQueueEventHandler<T> AsyncOnMessage;
private readonly bool _isAsync = false;
public bool IsDisposed => Interlocked.Read( ref _isDisposed ) > 0;
/// <summary>
/// Initializes a new instance of the <see cref="SyncQueue{T}"/> class in synchronous mode.
/// </summary>
/// <param name="prefex">A prefix used for logging and diagnostics.</param>
/// <param name="logger">An instance of <see cref="ILoggerQueue"/> for logging queue activities.</param>
/// <param name="token">A cancellation token used to signal shutdown.</param>
/// <remarks>
/// This constructor defaults to <c>isAsync = false</c>, meaning the queue will use the <see cref="OnMessage"/> event
/// for message handling and process messages sequentially using synchronous invocation.
/// </remarks>
public SyncQueue( string prefex, ILoggerQueue logger, CancellationToken token )
: this( prefex, logger, false, token ) {
}
/// <summary>
/// Initializes a new instance of the <see cref="SyncQueue{T}"/> class with configurable async support.
/// </summary>
/// <param name="prefex">A prefix used for logging and diagnostics.</param>
/// <param name="logger">An instance of <see cref="ILoggerQueue"/> for logging queue activities.</param>
/// <param name="isAsync">
/// If set to <c>true</c>, the queue will invoke the <see cref="AsyncOnMessage"/> event for each dequeued message
/// and await its completion before processing the next item. If <c>false</c>, it will use <see cref="OnMessage"/> instead.
/// </param>
/// <param name="token">A cancellation token used to gracefully stop the queue and shut down its worker thread.</param>
/// <remarks>
/// The queue uses an unbounded channel with a single-reader configuration to ensure sequential processing.
/// When <paramref name="isAsync"/> is true, message handling supports full asynchronous flow with guaranteed ordering.
/// </remarks>
public SyncQueue( string prefex, ILoggerQueue logger, bool isAsync, CancellationToken token )
: base( token ) {
_prefex = prefex;
_logger = logger;
_isAsync = isAsync;
_queue = Channel.CreateUnbounded<T>( new UnboundedChannelOptions {
SingleReader = true,
SingleWriter = false
} );
}
public void TryWrite( T message ) {
Enqueue( message );
}
/// <summary>
/// Attempts to enqueue a message to the internal queue safely.
/// If the fast, non-blocking path fails, it falls back to retrying the write
/// operation with optional exponential backoff delays. The method ensures best-effort
/// message delivery without throwing exceptions.
/// </summary>
/// <param name="message">The message to enqueue.</param>
/// <param name="retries">The number of retry attempts after the initial failure. Default is 3.</param>
/// <returns>
/// Returns <c>true</c> if the message was successfully enqueued within the retry limit;
/// otherwise, returns <c>false</c>.
/// </returns>
private bool TryEnqueueSafe( T message, int retries = 3 ) {
if ( IsDisposed ) return false;
// Fast path: non-blocking write
if ( _queue.Writer.TryWrite( message ) )
return true;
// Fallback: retry up to N times using safe write logic
for ( int i = 0; i < retries; i++ ) {
try {
// Try again or skip if cancellation requested
if ( _queue.Writer.TryWrite( message ) || IsCancellationRequested ) return true;
} catch ( Exception ex ) {
// Log the exception but continue retrying
_logger.Error( $"{_prefex} Failed to queue message, retry \"{i + 1}\"", ex );
}
// Optional delay before the next retry (exponential backoff)
// Stop if wait was interrupted (cancellation requested)
if ( base.Wait( 100 * ( i + 1 ) ) ) break;
}
return false; // All retries failed
}
public void Enqueue( T message ) {
if ( IsDisposed ) return;
try {
if ( !TryEnqueueSafe( message ) ) {
_logger.Warning( $"{_prefex} Failed to Enqueue" );
}
} catch ( Exception e ) {
_logger.Error( _prefex, e );
}
}
public Task<bool> TryWriteAsync( T message ) {
return EnqueueAsync( message );
}
public async Task<bool> EnqueueAsync( T message, int retries = 3 ) {
if ( IsDisposed ) return false;
// Fast path: non-blocking write
if ( _queue.Writer.TryWrite( message ) )
return true;
// Fallback: safe async write with retries
for ( int i = 0; i < retries; i++ ) {
try {
await _queue.Writer.WriteAsync( message, base.Token );
return true;
} catch ( Exception ex ) {
// Log the exception but continue retrying
_logger.Error( $"{_prefex} Failed to queue message, retry \"{i + 1}\"", ex );
}
// Optional delay before the next retry (exponential backoff)
// Stop if wait was interrupted (cancellation requested)
if ( base.Wait( 100 * ( i + 1 ) ) ) break; // Exponential backoff
}
return false; // 🚨 All retries failed
}
public void Stop( ) {
ExitThread( );
}
public void Start( ) {
ExitThread( );
_worker = new Thread( new ParameterizedThreadStart( DequeueAsync ) ) { IsBackground = true };
ThreadSafe.SetApartmentState( _worker, ApartmentState.STA );
_worker.Start( _worker.ManagedThreadId );
//Task.Factory.StartNew( ( ) => DequeueAsync( Thread.CurrentThread.ManagedThreadId ),
// CancellationToken.None, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default );
}
private void ExitThread( ) {
var worker = Interlocked.Exchange( ref _worker, null );
if ( worker != null ) {
ThreadSafe.ExitThread( worker );
}
}
/// <summary>
/// Releases all resources used by the channel queue and stops its background processing thread.
/// </summary>
/// <remarks>
/// This method performs a graceful shutdown of the queue:
/// <list type="bullet">
/// <item><description>Marks the queue as disposed to prevent further operations.</description></item>
/// <item><description>Cancels internal operations and signals any active wait handles.</description></item>
/// <item><description>Stops the processing thread by calling <see cref="ExitThread"/>.</description></item>
/// <item><description>
/// Logs a warning if there are pending unprocessed messages left in the queue.
/// </description></item>
/// </list>
/// After disposal, no additional messages should be enqueued or processed.
/// Multiple calls are safe and have no additional effect.
/// </remarks>
public void Dispose( ) {
if ( IsDisposed ) return;
// Atomically mark as disposed
_ = Interlocked.Exchange( ref _isDisposed, 1 );
// Cancel tokens and signal shutdown
DisposeOrCreateToken( true );
// Stop background thread
ExitThread( );
try {
// 🔍 Check for any unprocessed messages still remaining in the queue
int pending = _queue.Reader.Count;
if ( pending > 0 )
// Log a warning if messages were left unprocessed at disposal time
_logger?.Warning( $"Channel Queue \"{_prefex}\" pending to process: {pending}" );
} catch {
// Suppress any logging errors (e.g., if the queue is already disposed)
}
// Final log message for disposal
_logger?.Info( $"Disposed \"{_prefex}\" Channel Queue" );
// Suppress finalization since cleanup is done
GC.SuppressFinalize( this );
}
/// <summary>
/// Continuously reads messages from the queue and dispatches them to the <see cref="OnMessage"/> handler.
/// </summary>
/// <param name="requester">The thread identifier for logging context.</param>
private async void DequeueAsync( object requester ) {
string appName = $"{_prefex} Channel Queue";
_logger.Info( $"Starting \"{appName}\" with thread #\"{requester}\"" );
if ( _isAsync && AsyncOnMessage == null )
_logger.Warning( $"{_prefex} started in async mode, but no AsyncOnMessage handler attached." );
if ( !_isAsync && OnMessage == null )
_logger.Warning( $"{_prefex} started in sync mode, but no OnMessage handler attached." );
if ( OnStart != null ) {
await OnStart.Invoke( );
}
try {
do {
try {
if ( IsCancellationRequested ) break;
T item = await _queue.Reader.ReadAsync( base.Token );
if ( item == null ) continue;
if ( _isAsync ) {
if ( AsyncOnMessage != null )
await AsyncOnMessage.Invoke( item );
} else {
OnMessage?.Invoke( item );
}
} catch ( TaskCanceledException ) {
// We can get this exception while task cancelled occured.
_logger.Warning( $"Exiting \"{appName}\" due to task canceled" );
break;
} catch ( OperationCanceledException ) {
break;
} catch ( Exception e ) {
// We can get this exception while operational error occured.
_logger.Error( _prefex, e );
}
} while ( true );
} catch ( ThreadAbortException ) {
// We can get this exception while thread abort occured.
} catch ( ThreadInterruptedException ) { } catch ( Exception ) { }
_logger.Info( $"Exited {appName}." );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment