Last active
October 12, 2022 08:26
-
-
Save walterlv/d2aecd02dfad74279713112d44bcd358 to your computer and use it in GitHub Desktop.
An awaitable for waiting a part of a loop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.Runtime.CompilerServices; | |
using System.Threading.Tasks; | |
namespace Walterlv | |
{ | |
/// <summary> | |
/// 为一个持续操作中的一部分提供可异步等待的操作。 | |
/// </summary> | |
public class ContinuousPartOperation | |
{ | |
private readonly TaskCompletionSource<object> _source; | |
private readonly Awaiter _awaiter; | |
private Action _continuation; | |
private Exception _exception; | |
internal ContinuousPartOperation() | |
{ | |
_source = new TaskCompletionSource<object>(); | |
_awaiter = new Awaiter(this); | |
} | |
/// <summary> | |
/// 获取一个值,该值指示此异步操作是否已经结束。 | |
/// </summary> | |
internal bool IsCompleted { get; private set; } | |
/// <summary> | |
/// 完成此异步操作。 | |
/// </summary> | |
internal void Complete() | |
{ | |
IsCompleted = true; | |
if (_exception == null) | |
{ | |
_source.SetResult(null); | |
} | |
else | |
{ | |
_source.SetException(_exception); | |
} | |
var continuation = _continuation; | |
_continuation = null; | |
continuation?.Invoke(); | |
} | |
/// <summary> | |
/// 使用一个新的 <paramref name="exception"/> 来设置此异步操作完成对象。 | |
/// </summary> | |
/// <param name="exception">一个异常,当设置后,同步或异步等待此对象时将抛出异常。</param> | |
internal void UpdateException(Exception exception) => _exception = exception; | |
/// <summary> | |
/// 获取一个用于等待此异步操作的可等待对象。 | |
/// </summary> | |
public Awaiter GetAwaiter() => _awaiter; | |
/// <summary> | |
/// 同步等待此异步操作完成。 | |
/// </summary> | |
public void Wait() => _source.Task.GetAwaiter().GetResult(); | |
/// <summary> | |
/// 表示用于等待 <see cref="ContinuousPartOperation"/> 的异步可等待对像。 | |
/// </summary> | |
public sealed class Awaiter : INotifyCompletion | |
{ | |
private readonly ContinuousPartOperation _owner; | |
/// <summary> | |
/// 创建一个用于等待 <see cref="ContinuousPartOperation"/> 的异步可等待对象。 | |
/// </summary> | |
internal Awaiter(ContinuousPartOperation owner) | |
{ | |
_owner = owner; | |
} | |
/// <summary>Schedules the continuation action that's invoked when the instance completes.</summary> | |
/// <param name="continuation">The action to invoke when the operation completes.</param> | |
/// <exception cref="T:System.ArgumentNullException">The <paramref name="continuation">continuation</paramref> argument is null (Nothing in Visual Basic).</exception> | |
public void OnCompleted(Action continuation) | |
{ | |
if (IsCompleted) | |
{ | |
continuation?.Invoke(); | |
} | |
else | |
{ | |
_owner._continuation += continuation; | |
} | |
} | |
/// <summary> | |
/// 获取一个值,该值指示异步操作是否完成。 | |
/// </summary> | |
public bool IsCompleted => _owner.IsCompleted; | |
/// <summary> | |
/// 获取此异步操作的结果。 | |
/// </summary> | |
[DebuggerStepThrough] | |
public void GetResult() => _owner._source.Task.GetAwaiter().GetResult(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace Walterlv | |
{ | |
/// <summary> | |
/// 为次数限制的异步等待操作提供操作。 | |
/// </summary> | |
public class CountLimitOperationToken | |
{ | |
private readonly long _countLimit; | |
private long _passed; | |
/// <summary> | |
/// 创建一个具有指定执行次数限制的 <see cref="CountLimitOperationToken"/>。 | |
/// </summary> | |
/// <param name="countLimit">次数限制,可能是不精确的。</param> | |
public CountLimitOperationToken(long countLimit) | |
{ | |
Operation = new ContinuousPartOperation(); | |
_countLimit = countLimit < 0 ? long.MaxValue : countLimit; | |
} | |
/// <summary> | |
/// 获取一个可 await 等待的等待对象。 | |
/// </summary> | |
public ContinuousPartOperation Operation { get; } | |
/// <summary> | |
/// 完成此异步操作。 | |
/// </summary> | |
/// <param name="removeIntermediateExceptions"> | |
/// 默认情况下,如果此前发生过异常,则认为那是重试过程中的中间异常,现在成功完成了任务,所以中间异常需要移除。 | |
/// 不过,你也可以选择不移除,意味着此任务的完成属于强制终止,而不是成功完成。 | |
/// </param> | |
public void Complete(bool removeIntermediateExceptions = true) | |
{ | |
if (removeIntermediateExceptions) | |
{ | |
Operation.UpdateException(null); | |
} | |
if (!Operation.IsCompleted) | |
{ | |
Operation.Complete(); | |
} | |
} | |
/// <summary> | |
/// 通知此 <see cref="ContinuousPartOperation"/> 自上次调用 <see cref="Pass"/> 方法以来增加的次数。 | |
/// </summary> | |
/// <param name="countPassed">自上次调用 <see cref="Pass"/> 方法以来增加的次数。</param> | |
public void Pass(long countPassed) | |
{ | |
_passed += countPassed; | |
if (_passed >= _countLimit && !Operation.IsCompleted) | |
{ | |
Operation.Complete(); | |
} | |
} | |
/// <summary> | |
/// 使用一个新的 <paramref name="exception"/> 来设置此异步操作完成对象。 | |
/// </summary> | |
/// <param name="exception">一个异常,当设置后,同步或异步等待此对象时将抛出异常。</param> | |
public void UpdateException(Exception exception) | |
{ | |
if (!Operation.IsCompleted) | |
{ | |
Operation.UpdateException(exception); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace Walterlv | |
{ | |
/// <summary> | |
/// 为一个操作包装结果信息,包括成功与否、异常和取消信息。 | |
/// </summary> | |
public readonly struct OperationResult | |
{ | |
/// <summary> | |
/// 使用指定的异常创建 <see cref="OperationResult"/> 的新实例。 | |
/// 这个操作结果是失败的。 | |
/// </summary> | |
/// <param name="exception">操作过程中收集到的异常。</param> | |
public OperationResult(Exception exception) | |
{ | |
Exception = exception ?? throw new ArgumentNullException(nameof(exception)); | |
IsCanceled = false; | |
} | |
/// <summary> | |
/// 创建一个成功的或者取消的 <see cref="OperationResult"/>。 | |
/// </summary> | |
/// <param name="isSuccessOrCanceled"> | |
/// 如果为 true,则创建一个成功的操作结果;如果为 false,创建一个取消的操作结果。 | |
/// </param> | |
public OperationResult(bool isSuccessOrCanceled) | |
{ | |
Exception = null; | |
IsCanceled = !isSuccessOrCanceled; | |
} | |
/// <summary> | |
/// 获取一个值,该值指示操作已经成功完成。 | |
/// </summary> | |
public bool Success => Exception == null && IsCanceled is false; | |
/// <summary> | |
/// 获取操作过程中发生或收集的异常。 | |
/// </summary> | |
public Exception Exception { get; } | |
/// <summary> | |
/// 获取此操作是否已被取消。 | |
/// </summary> | |
public bool IsCanceled { get; } | |
/// <summary> | |
/// 将操作结果视为成功与否的 bool 值。 | |
/// </summary> | |
public static implicit operator bool(OperationResult result) => result.Success; | |
/// <summary> | |
/// 将操作结果视为异常。 | |
/// </summary> | |
public static implicit operator Exception(OperationResult result) => result.Exception; | |
/// <summary> | |
/// 将异常作为操作结果使用。 | |
/// </summary> | |
public static implicit operator OperationResult(Exception exception) | |
=> new OperationResult(exception ?? throw new ArgumentNullException(nameof(exception), | |
$"只有非空的异常才可以看转换为 {nameof(OperationResult)}。")); | |
/// <summary> | |
/// 将成功或取消信息作为操作结果使用。 | |
/// </summary> | |
public static implicit operator OperationResult(bool isSuccessOrCanceled) | |
=> new OperationResult(isSuccessOrCanceled); | |
/// <summary> | |
/// 判断操作是否是成功的。 | |
/// </summary> | |
public static bool operator true(OperationResult result) => result.Success; | |
/// <summary> | |
/// 判断操作是否是失败的。 | |
/// </summary> | |
public static bool operator false(OperationResult result) => !result.Success; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
namespace Walterlv | |
{ | |
/// <summary> | |
/// 对于异步的,出错后会重试的操作,使用此类型可以辅助等待循环重试的一部分。 | |
/// </summary> | |
public class PartialAwaitableRetry | |
{ | |
private readonly object _locker = new object(); | |
private readonly Func<PartialRetryContext, Task<OperationResult>> _loopItem; | |
private readonly List<CountLimitOperationToken> _tokens = new List<CountLimitOperationToken>(); | |
private volatile bool _isLooping; | |
/// <summary> | |
/// 使用一个循环任务初始化 <see cref="PartialAwaitableRetry"/> 的一个新实例。 | |
/// </summary> | |
/// <param name="loopItem">一个循环任务。</param> | |
public PartialAwaitableRetry(Func<PartialRetryContext, Task<OperationResult>> loopItem) | |
{ | |
_loopItem = loopItem ?? throw new ArgumentNullException(nameof(loopItem)); | |
} | |
/// <summary> | |
/// 以指定的次数限制加入循环,并返回等待此循环结果的可等待对象。 | |
/// 此方法是线程安全的。 | |
/// </summary> | |
/// <param name="countLimit">次数限制,当设置为 -1 时表示无限次循环。</param> | |
/// <returns>等待循环结果的可等待对象。</returns> | |
public ContinuousPartOperation JoinAsync(int countLimit) | |
{ | |
var token = new CountLimitOperationToken(countLimit); | |
lock (_locker) | |
{ | |
_tokens.Add(token); | |
if (!_isLooping) | |
{ | |
Loop(); | |
} | |
} | |
return token.Operation; | |
} | |
/// <summary> | |
/// 执行实际的循环,并在每一次执行的时候会给所有的等待对象报告结果。 | |
/// </summary> | |
private async void Loop() | |
{ | |
_isLooping = true; | |
var context = new PartialRetryContext(); | |
var shouldContinue = true; | |
try | |
{ | |
while (shouldContinue) | |
{ | |
Exception exception; | |
bool isCompleted; | |
// 加锁获取此时此刻的 Token 集合副本。 | |
// 执行一次循环的时候,只能操作此集合副本,真实集合新增的元素由于没有参与循环操作的执行; | |
// 这意味着期望执行一次方法的时候却没有执行,所以不能提供结果。 | |
List<CountLimitOperationToken> snapshot; | |
lock (_locker) | |
{ | |
snapshot = _tokens.ToList(); | |
} | |
try | |
{ | |
var result = await _loopItem.Invoke(context).ConfigureAwait(false); | |
exception = result.Exception; | |
isCompleted = result.Success; | |
} | |
catch (Exception ex) | |
{ | |
exception = ex; | |
isCompleted = false; | |
} | |
if (exception != null) | |
{ | |
foreach (var token in snapshot) | |
{ | |
token.UpdateException(exception); | |
} | |
} | |
if (isCompleted) | |
{ | |
foreach (var token in snapshot) | |
{ | |
token.Complete(); | |
} | |
lock (_locker) | |
{ | |
_tokens.RemoveAll(token => snapshot.Contains(token)); | |
shouldContinue = _tokens.Count > 0; | |
} | |
} | |
else | |
{ | |
foreach (var token in snapshot) | |
{ | |
token.Pass(context.StepCount); | |
} | |
} | |
} | |
} | |
finally | |
{ | |
_isLooping = false; | |
} | |
} | |
} | |
/// <summary> | |
/// 为 <see cref="PartialAwaitableRetry"/> 提供循环执行的上下文设置信息。 | |
/// </summary> | |
public sealed class PartialRetryContext | |
{ | |
private int _stepCount = 1; | |
/// <summary> | |
/// 获取或设置此方法一次执行时经过了多少次循环。 | |
/// 当某个方法执行时需要进行不打断的多次循环才能完成时,可以修改此值。 | |
/// </summary> | |
public int StepCount | |
{ | |
get => _stepCount; | |
set | |
{ | |
if (_stepCount <= 0) | |
{ | |
throw new ArgumentException("次数必须大于或等于 1。", nameof(value)); | |
} | |
_stepCount = value; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment