Created
October 7, 2021 16:46
-
-
Save doctorpangloss/10f0b6fb461c99369abc15f2f4327a1f to your computer and use it in GitHub Desktop.
Improving semantics around gRPC on Unity
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using Grpc.Core; | |
using UniRx; | |
using UniRx.Diagnostics; | |
using UnityEngine; | |
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using Cysharp.Threading.Tasks; | |
using Channel = Cysharp.Threading.Tasks.Channel; | |
namespace HiddenSwitch.Grpc.ComponentModel | |
{ | |
public static class AsyncCallExtensions | |
{ | |
/// <summary> | |
/// The default way to convert an <see cref="AsyncUnaryCall{TResponse}"/> from the gRPC-specified server RPCs | |
/// to something that is easy to use in Unity3D. | |
/// </summary> | |
/// <param name="call">A call typically created by calling a <c>...Async()</c> method on | |
/// <see cref="DefaultClient.legacyClient"/></param> | |
/// <typeparam name="T">The return type.</typeparam> | |
/// <returns>An observable that will correctly <see cref="Subject{T}.OnError"/> the inner exception if the call | |
/// has an exception, <b>not</b> wrapped in <see cref="AggregateException"/>.</returns> | |
public static IObservable<T> ToObservable<T>(this AsyncUnaryCall<T> call) | |
{ | |
return call.ResponseAsync | |
.ToObservable(Scheduler.MainThread) | |
#if UNITY_EDITOR | |
.DoOnError(Debug.LogError) | |
#endif | |
; | |
} | |
public static IObservable<T> AsObservable<T>(this IAsyncStreamReader<T> reader) => | |
new GrpcStreamObservable<T>(reader) | |
.ObserveOnMainThread(); | |
public static QueuedAsyncWriteable<T> AsQueuedWritable<T>(this IClientStreamWriter<T> streamWriter) | |
{ | |
return new QueuedAsyncWriteable<T>(streamWriter); | |
} | |
} | |
public class GrpcStreamObservable<T> : IObservable<T> | |
{ | |
private readonly IAsyncStreamReader<T> m_Reader; | |
private readonly CancellationToken m_Token; | |
private int m_Used; | |
protected GrpcStreamObservable() | |
{ | |
} | |
public GrpcStreamObservable(IAsyncStreamReader<T> reader, CancellationToken token = default(CancellationToken)) | |
{ | |
m_Reader = reader; | |
m_Token = token; | |
m_Used = 0; | |
} | |
public IDisposable Subscribe(IObserver<T> observer) => | |
Interlocked.Exchange(ref m_Used, 1) == 0 | |
? new GrpcStreamSubscription<T>(m_Reader, observer, m_Token) | |
: throw new InvalidOperationException("Subscribe can only be called once."); | |
} | |
public class GrpcStreamSubscription<T> : IDisposable | |
{ | |
private readonly UniTaskVoid m_Task; | |
private readonly CancellationTokenSource m_TokenSource; | |
private bool m_Completed; | |
public GrpcStreamSubscription(IAsyncStreamReader<T> reader, IObserver<T> observer, CancellationToken token) | |
{ | |
m_TokenSource = new CancellationTokenSource(); | |
token.Register(m_TokenSource.Cancel); | |
m_Task = Run(reader, observer, m_TokenSource.Token); | |
} | |
private async UniTaskVoid Run(IAsyncStreamReader<T> reader, IObserver<T> observer, CancellationToken token) | |
{ | |
while (!token.IsCancellationRequested) | |
{ | |
try | |
{ | |
if (!await reader.MoveNext(token)) | |
{ | |
break; | |
} | |
} | |
// todo: should we really omit NotFound here? | |
catch (RpcException e) when (/*e.StatusCode == StatusCode.NotFound || | |
*/e.StatusCode == StatusCode.Cancelled) | |
{ | |
// cancelled peacefully ends with OnCompleted | |
break; | |
} | |
catch (OperationCanceledException) | |
{ | |
break; | |
} | |
catch (Exception e) | |
{ | |
observer.OnError(e); | |
m_Completed = true; | |
return; | |
} | |
observer.OnNext(reader.Current); | |
} | |
m_Completed = true; | |
observer.OnCompleted(); | |
} | |
public void Dispose() | |
{ | |
if (!m_Completed && !m_TokenSource.IsCancellationRequested) | |
{ | |
m_TokenSource.Cancel(); | |
} | |
m_TokenSource.Dispose(); | |
} | |
} | |
public sealed class QueuedAsyncWriteable<T> | |
{ | |
private readonly IClientStreamWriter<T> m_StreamWriter; | |
private volatile bool m_Busy = false; | |
private IProducerConsumerCollection<(T, UniTaskCompletionSource)> m_Queue = | |
new ConcurrentQueue<(T, UniTaskCompletionSource)>(); | |
public QueuedAsyncWriteable(IClientStreamWriter<T> streamWriter) | |
{ | |
m_StreamWriter = streamWriter; | |
} | |
public async UniTask CompleteAsync() | |
{ | |
await m_StreamWriter.CompleteAsync(); | |
} | |
public async UniTask WriteAsync(T message) | |
{ | |
var awaitMyCompletion = new UniTaskCompletionSource(); | |
// technically this is single-thread anyway | |
m_Queue.TryAdd((message, awaitMyCompletion)); | |
// if we're not busy, send the messages | |
while (!m_Busy && m_Queue.TryTake(out var info)) | |
{ | |
m_Busy = true; | |
var (outgoing, existingSource) = info; | |
try | |
{ | |
await m_StreamWriter.WriteAsync(outgoing); | |
existingSource.TrySetResult(); | |
} | |
catch (OperationCanceledException) | |
{ | |
existingSource.TrySetCanceled(); | |
} | |
catch (InvalidOperationException) | |
{ | |
existingSource.TrySetCanceled(); | |
} | |
catch (Exception ex) | |
{ | |
existingSource.TrySetException(ex); | |
} | |
finally | |
{ | |
m_Busy = false; | |
} | |
} | |
await awaitMyCompletion.Task; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment