Last active
July 30, 2025 18:00
-
-
Save rajibchy/1c40613137a4334c5a7ad7e8740d56df to your computer and use it in GitHub Desktop.
A high-performance, thread-safe message queue using `System.Threading.Channels.Channel`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright FSys Tech Limited [FSys]. All rights reserved. | |
| // | |
| // This software owned by FSys Tech Limited [FSys] and is protected by copyright law | |
| // and international copyright treaties. | |
| // | |
| // Access to and use of the software is governed by the terms of the applicable FSys Software | |
| // Services Agreement (the Agreement) and Customer end user license agreements granting | |
| // a non-assignable, non-transferable and non-exclusive license to use the software | |
| // for it's own data processing purposes under the terms defined in the Agreement. | |
| // | |
| // Except as otherwise granted within the terms of the Agreement, copying or reproduction of any part | |
| // of this source code or associated reference material to any other location for further reproduction | |
| // or redistribution, and any amendments to this copyright notice, are expressly prohibited. | |
| // | |
| // Any reproduction or redistribution for sale or hiring of the Software not in accordance with | |
| // the terms of the Agreement is a violation of copyright law. | |
| // 11:44 PM 7/29/2025 | |
| // by rajib chy | |
| using System; | |
| using System.Threading; | |
| using System.Threading.Tasks; | |
| using System.Threading.Channels; | |
| namespace FSys.Framework.Concurrent { | |
| /// <summary> | |
| /// Represents a delegate for handling queued channel messages. | |
| /// </summary> | |
| /// <typeparam name="TEventArgs">The type of the event data passed to the delegate.</typeparam> | |
| /// <param name="e">The event data.</param> | |
| public delegate void SyncQueueEventHandler<TEventArgs>( TEventArgs e ); | |
| /// <summary> | |
| /// Represents an asynchronous method that handles a queue event with a parameter of type <typeparamref name="TEventArgs"/>. | |
| /// </summary> | |
| /// <typeparam name="TEventArgs">The type of the event data.</typeparam> | |
| /// <param name="e">The event data passed to the handler.</param> | |
| /// <returns>A task that represents the asynchronous operation.</returns> | |
| public delegate Task AsyncQueueEventHandler<TEventArgs>( TEventArgs e ); | |
| public delegate Task SyncQueueStartEventHandler( ); | |
| /// <summary> | |
| /// Represents a generic interface for a channel-based message queue. | |
| /// </summary> | |
| /// <typeparam name="T">The type of messages handled by the queue.</typeparam> | |
| public interface ISyncQueue<T> : IDisposable { | |
| /// <summary> | |
| /// Gets the <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/> | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/> | |
| /// </returns> | |
| /// <exception cref="System.ObjectDisposedException"> | |
| /// The token source has been disposed. | |
| /// </exception> | |
| CancellationToken Token { get; } | |
| /// <summary> | |
| /// Starts the internal worker thread that processes queued messages. | |
| /// </summary> | |
| void Start( ); | |
| /// <summary> | |
| /// Stops the queue from processing further messages and initiates a graceful shutdown. | |
| /// </summary> | |
| /// <remarks> | |
| /// After calling this method, the queue should stop accepting or dequeuing new items. | |
| /// Any ongoing operations may complete, but no further processing will occur. | |
| /// This is typically used to shut down the queue safely before disposal. | |
| /// </remarks> | |
| void Stop( ); | |
| /// <summary> | |
| /// Gets a value indicating whether the queue has been disposed. | |
| /// </summary> | |
| bool IsDisposed { get; } | |
| /// <summary> | |
| /// Enqueues a <typeparamref name="T"/> message into the queue immediately without retries. | |
| /// </summary> | |
| /// <param name="message">The message to enqueue.</param> | |
| void Enqueue( T message ); | |
| /// <summary> | |
| /// Attempts to enqueue a <typeparamref name="T"/> message synchronously into the queue. | |
| /// </summary> | |
| /// <param name="message">The message to enqueue.</param> | |
| void TryWrite( T message ); | |
| /// <summary> | |
| /// Attempts to enqueue a <typeparamref name="T"/> message asynchronously into the queue. | |
| /// </summary> | |
| /// <param name="message">The message to enqueue.</param> | |
| /// <returns> | |
| /// A task that represents the asynchronous enqueue operation. The result indicates whether the message was successfully enqueued. | |
| /// </returns> | |
| Task<bool> TryWriteAsync( T message ); | |
| /// <summary> | |
| /// Attempts to enqueue a <typeparamref name="T"/> into the message queue with high throughput and fallback safety. | |
| /// </summary> | |
| /// <param name="message">The message item to enqueue.</param> | |
| /// <param name="retries"> | |
| /// The number of times to retry the asynchronous write if the queue is full. | |
| /// Defaults to 3 retries with exponential backoff on failure. | |
| /// </param> | |
| /// <returns> | |
| /// A task that resolves to <c>true</c> if the item was successfully enqueued; <c>false</c> if all retry attempts failed. | |
| /// </returns> | |
| /// <remarks> | |
| /// This method first attempts a fast, non-blocking write using <c>TryWrite</c>. If the queue is full, | |
| /// it falls back to <c>WriteAsync</c> with exponential backoff and retry logic. | |
| /// It balances performance with safety by minimizing overhead in the common case and ensuring | |
| /// message delivery under transient load. | |
| /// </remarks> | |
| Task<bool> EnqueueAsync( T message, int retries = 3 ); | |
| } | |
| /// <summary> | |
| /// A high-performance, thread-safe message queue using <see cref="System.Threading.Channels.Channel{T}"/> | |
| /// with retry and cancellation support. | |
| /// </summary> | |
| /// <typeparam name="T">The type of message being handled in the queue.</typeparam> | |
| public class SyncQueue<T> : TokenBase, ISyncQueue<T> { | |
| private long _isDisposed = 0; | |
| private Thread _worker = null; | |
| private readonly string _prefex; | |
| private readonly Channel<T> _queue; | |
| private readonly ILoggerQueue _logger; | |
| /// <summary> | |
| /// Occurs when a new message is dequeued and ready for processing. | |
| /// </summary> | |
| /// <remarks> | |
| /// This event is triggered synchronously each time an item of type <typeparamref name="T"/> | |
| /// is removed from the internal queue. Use this to handle or process messages as they arrive. | |
| /// Ensure handlers execute quickly to avoid blocking the queue. | |
| /// </remarks> | |
| public event SyncQueueEventHandler<T> OnMessage; | |
| /// <summary> | |
| /// Raised when the synchronous queue begins operation or is activated. | |
| /// This event can be used to initialize resources or log start-time actions. | |
| /// </summary> | |
| /// <remarks> | |
| /// The <see cref="SyncQueueStartEventHandler"/> delegate provides the sender object and event arguments. | |
| /// </remarks> | |
| public event SyncQueueStartEventHandler OnStart; | |
| /// <summary> | |
| /// Occurs when an item is dequeued and <c>isAsync</c> is set to <c>true</c>. | |
| /// </summary> | |
| /// <remarks> | |
| /// Handlers attached to this event are awaited, ensuring sequential asynchronous processing of each item. | |
| /// This event is mutually exclusive with <see cref="OnMessage"/>; only one will be used based on the queue mode. | |
| /// </remarks> | |
| public event AsyncQueueEventHandler<T> AsyncOnMessage; | |
| private readonly bool _isAsync = false; | |
| public bool IsDisposed => Interlocked.Read( ref _isDisposed ) > 0; | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="SyncQueue{T}"/> class in synchronous mode. | |
| /// </summary> | |
| /// <param name="prefex">A prefix used for logging and diagnostics.</param> | |
| /// <param name="logger">An instance of <see cref="ILoggerQueue"/> for logging queue activities.</param> | |
| /// <param name="token">A cancellation token used to signal shutdown.</param> | |
| /// <remarks> | |
| /// This constructor defaults to <c>isAsync = false</c>, meaning the queue will use the <see cref="OnMessage"/> event | |
| /// for message handling and process messages sequentially using synchronous invocation. | |
| /// </remarks> | |
| public SyncQueue( string prefex, ILoggerQueue logger, CancellationToken token ) | |
| : this( prefex, logger, false, token ) { | |
| } | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="SyncQueue{T}"/> class with configurable async support. | |
| /// </summary> | |
| /// <param name="prefex">A prefix used for logging and diagnostics.</param> | |
| /// <param name="logger">An instance of <see cref="ILoggerQueue"/> for logging queue activities.</param> | |
| /// <param name="isAsync"> | |
| /// If set to <c>true</c>, the queue will invoke the <see cref="AsyncOnMessage"/> event for each dequeued message | |
| /// and await its completion before processing the next item. If <c>false</c>, it will use <see cref="OnMessage"/> instead. | |
| /// </param> | |
| /// <param name="token">A cancellation token used to gracefully stop the queue and shut down its worker thread.</param> | |
| /// <remarks> | |
| /// The queue uses an unbounded channel with a single-reader configuration to ensure sequential processing. | |
| /// When <paramref name="isAsync"/> is true, message handling supports full asynchronous flow with guaranteed ordering. | |
| /// </remarks> | |
| public SyncQueue( string prefex, ILoggerQueue logger, bool isAsync, CancellationToken token ) | |
| : base( token ) { | |
| _prefex = prefex; | |
| _logger = logger; | |
| _isAsync = isAsync; | |
| _queue = Channel.CreateUnbounded<T>( new UnboundedChannelOptions { | |
| SingleReader = true, | |
| SingleWriter = false | |
| } ); | |
| } | |
| public void TryWrite( T message ) { | |
| Enqueue( message ); | |
| } | |
| /// <summary> | |
| /// Attempts to enqueue a message to the internal queue safely. | |
| /// If the fast, non-blocking path fails, it falls back to retrying the write | |
| /// operation with optional exponential backoff delays. The method ensures best-effort | |
| /// message delivery without throwing exceptions. | |
| /// </summary> | |
| /// <param name="message">The message to enqueue.</param> | |
| /// <param name="retries">The number of retry attempts after the initial failure. Default is 3.</param> | |
| /// <returns> | |
| /// Returns <c>true</c> if the message was successfully enqueued within the retry limit; | |
| /// otherwise, returns <c>false</c>. | |
| /// </returns> | |
| private bool TryEnqueueSafe( T message, int retries = 3 ) { | |
| if ( IsDisposed ) return false; | |
| // Fast path: non-blocking write | |
| if ( _queue.Writer.TryWrite( message ) ) | |
| return true; | |
| // Fallback: retry up to N times using safe write logic | |
| for ( int i = 0; i < retries; i++ ) { | |
| try { | |
| // Try again or skip if cancellation requested | |
| if ( _queue.Writer.TryWrite( message ) || IsCancellationRequested ) return true; | |
| } catch ( Exception ex ) { | |
| // Log the exception but continue retrying | |
| _logger.Error( $"{_prefex} Failed to queue message, retry \"{i + 1}\"", ex ); | |
| } | |
| // Optional delay before the next retry (exponential backoff) | |
| // Stop if wait was interrupted (cancellation requested) | |
| if ( base.Wait( 100 * ( i + 1 ) ) ) break; | |
| } | |
| return false; // All retries failed | |
| } | |
| public void Enqueue( T message ) { | |
| if ( IsDisposed ) return; | |
| try { | |
| if ( !TryEnqueueSafe( message ) ) { | |
| _logger.Warning( $"{_prefex} Failed to Enqueue" ); | |
| } | |
| } catch ( Exception e ) { | |
| _logger.Error( _prefex, e ); | |
| } | |
| } | |
| public Task<bool> TryWriteAsync( T message ) { | |
| return EnqueueAsync( message ); | |
| } | |
| public async Task<bool> EnqueueAsync( T message, int retries = 3 ) { | |
| if ( IsDisposed ) return false; | |
| // Fast path: non-blocking write | |
| if ( _queue.Writer.TryWrite( message ) ) | |
| return true; | |
| // Fallback: safe async write with retries | |
| for ( int i = 0; i < retries; i++ ) { | |
| try { | |
| await _queue.Writer.WriteAsync( message, base.Token ); | |
| return true; | |
| } catch ( Exception ex ) { | |
| // Log the exception but continue retrying | |
| _logger.Error( $"{_prefex} Failed to queue message, retry \"{i + 1}\"", ex ); | |
| } | |
| // Optional delay before the next retry (exponential backoff) | |
| // Stop if wait was interrupted (cancellation requested) | |
| if ( base.Wait( 100 * ( i + 1 ) ) ) break; // Exponential backoff | |
| } | |
| return false; // 🚨 All retries failed | |
| } | |
| public void Stop( ) { | |
| ExitThread( ); | |
| } | |
| public void Start( ) { | |
| ExitThread( ); | |
| _worker = new Thread( new ParameterizedThreadStart( DequeueAsync ) ) { IsBackground = true }; | |
| ThreadSafe.SetApartmentState( _worker, ApartmentState.STA ); | |
| _worker.Start( _worker.ManagedThreadId ); | |
| //Task.Factory.StartNew( ( ) => DequeueAsync( Thread.CurrentThread.ManagedThreadId ), | |
| // CancellationToken.None, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default ); | |
| } | |
| private void ExitThread( ) { | |
| var worker = Interlocked.Exchange( ref _worker, null ); | |
| if ( worker != null ) { | |
| ThreadSafe.ExitThread( worker ); | |
| } | |
| } | |
| /// <summary> | |
| /// Releases all resources used by the channel queue and stops its background processing thread. | |
| /// </summary> | |
| /// <remarks> | |
| /// This method performs a graceful shutdown of the queue: | |
| /// <list type="bullet"> | |
| /// <item><description>Marks the queue as disposed to prevent further operations.</description></item> | |
| /// <item><description>Cancels internal operations and signals any active wait handles.</description></item> | |
| /// <item><description>Stops the processing thread by calling <see cref="ExitThread"/>.</description></item> | |
| /// <item><description> | |
| /// Logs a warning if there are pending unprocessed messages left in the queue. | |
| /// </description></item> | |
| /// </list> | |
| /// After disposal, no additional messages should be enqueued or processed. | |
| /// Multiple calls are safe and have no additional effect. | |
| /// </remarks> | |
| public void Dispose( ) { | |
| if ( IsDisposed ) return; | |
| // Atomically mark as disposed | |
| _ = Interlocked.Exchange( ref _isDisposed, 1 ); | |
| // Cancel tokens and signal shutdown | |
| DisposeOrCreateToken( true ); | |
| // Stop background thread | |
| ExitThread( ); | |
| try { | |
| // 🔍 Check for any unprocessed messages still remaining in the queue | |
| int pending = _queue.Reader.Count; | |
| if ( pending > 0 ) | |
| // Log a warning if messages were left unprocessed at disposal time | |
| _logger?.Warning( $"Channel Queue \"{_prefex}\" pending to process: {pending}" ); | |
| } catch { | |
| // Suppress any logging errors (e.g., if the queue is already disposed) | |
| } | |
| // Final log message for disposal | |
| _logger?.Info( $"Disposed \"{_prefex}\" Channel Queue" ); | |
| // Suppress finalization since cleanup is done | |
| GC.SuppressFinalize( this ); | |
| } | |
| /// <summary> | |
| /// Continuously reads messages from the queue and dispatches them to the <see cref="OnMessage"/> handler. | |
| /// </summary> | |
| /// <param name="requester">The thread identifier for logging context.</param> | |
| private async void DequeueAsync( object requester ) { | |
| string appName = $"{_prefex} Channel Queue"; | |
| _logger.Info( $"Starting \"{appName}\" with thread #\"{requester}\"" ); | |
| if ( _isAsync && AsyncOnMessage == null ) | |
| _logger.Warning( $"{_prefex} started in async mode, but no AsyncOnMessage handler attached." ); | |
| if ( !_isAsync && OnMessage == null ) | |
| _logger.Warning( $"{_prefex} started in sync mode, but no OnMessage handler attached." ); | |
| if ( OnStart != null ) { | |
| await OnStart.Invoke( ); | |
| } | |
| try { | |
| do { | |
| try { | |
| if ( IsCancellationRequested ) break; | |
| T item = await _queue.Reader.ReadAsync( base.Token ); | |
| if ( item == null ) continue; | |
| if ( _isAsync ) { | |
| if ( AsyncOnMessage != null ) | |
| await AsyncOnMessage.Invoke( item ); | |
| } else { | |
| OnMessage?.Invoke( item ); | |
| } | |
| } catch ( TaskCanceledException ) { | |
| // We can get this exception while task cancelled occured. | |
| _logger.Warning( $"Exiting \"{appName}\" due to task canceled" ); | |
| break; | |
| } catch ( OperationCanceledException ) { | |
| break; | |
| } catch ( Exception e ) { | |
| // We can get this exception while operational error occured. | |
| _logger.Error( _prefex, e ); | |
| } | |
| } while ( true ); | |
| } catch ( ThreadAbortException ) { | |
| // We can get this exception while thread abort occured. | |
| } catch ( ThreadInterruptedException ) { } catch ( Exception ) { } | |
| _logger.Info( $"Exited {appName}." ); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment