Last active
March 23, 2018 21:12
-
-
Save itn3000/56dcebd4b128c46ca7b882dc951c9345 to your computer and use it in GitHub Desktop.
System.IO.Pipelines tests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
struct TestEnumerator<T> : IEnumerator<T> | |
{ | |
ArraySegment<T> m_Segment; | |
public TestEnumerator(ref ArraySegment<T> seg) | |
{ | |
m_Segment = seg; | |
} | |
public T Current => throw new NotImplementedException(); | |
object IEnumerator.Current => throw new NotImplementedException(); | |
public void Dispose() | |
{ | |
throw new NotImplementedException(); | |
} | |
public bool MoveNext() | |
{ | |
throw new NotImplementedException(); | |
} | |
public void Reset() | |
{ | |
throw new NotImplementedException(); | |
} | |
} | |
namespace pipelinetest | |
{ | |
using System.IO.Pipelines; | |
using System.Buffers; | |
using System.Threading.Tasks; | |
using System.Linq; | |
using System.Threading; | |
static class ArraySegmentExtension | |
{ | |
public struct ArraySegmentEnumerator<T> | |
{ | |
public ArraySegmentEnumerator(ArraySegment<T> seg) | |
{ | |
m_Segment = seg; | |
m_CurrentIndex = 0; | |
m_IsEnd = false; | |
} | |
ArraySegment<T> m_Segment; | |
public T Current => m_Segment.Array[m_Segment.Offset + m_CurrentIndex]; | |
int m_CurrentIndex; | |
bool m_IsEnd; | |
public bool MoveNext() | |
{ | |
if (!m_IsEnd) | |
{ | |
m_CurrentIndex++; | |
if (m_CurrentIndex < m_Segment.Count) | |
{ | |
return true; | |
} | |
m_IsEnd = true; | |
} | |
return false; | |
} | |
} | |
public static ArraySegmentEnumerator<T> GetEnumerator<T>(this ArraySegment<T> seg) | |
{ | |
return new ArraySegmentEnumerator<T>(seg); | |
} | |
} | |
class Program | |
{ | |
static async Task PipelineTest() | |
{ | |
const int DataSize = 1000; | |
const int MaxLoop = 1000; | |
var opt = new PipeOptions(minimumSegmentSize: DataSize); | |
var pipe = new Pipe(opt); | |
await Task.WhenAll(Task.Run(() => | |
{ | |
for (int i = 0; i < MaxLoop; i++) | |
{ | |
var s = pipe.Writer.GetSpan(DataSize); | |
s[0] = (byte)(i & 0xff); | |
pipe.Writer.Advance(10); | |
pipe.Writer.Commit(); | |
// await pipe.Writer.FlushAsync(); | |
} | |
pipe.Writer.Complete(); | |
Console.WriteLine($"write done"); | |
}), | |
Task.Run(async () => | |
{ | |
while(true) | |
{ | |
var readResult = await pipe.Reader.ReadAsync(); | |
Console.WriteLine($"read buffer length is {readResult.Buffer.Length}"); | |
foreach(var b in readResult.Buffer) | |
{ | |
Console.WriteLine($"{b.Span[0]}"); | |
} | |
pipe.Reader.AdvanceTo(readResult.Buffer.End); | |
if(readResult.IsCompleted) | |
{ | |
break; | |
} | |
} | |
pipe.Reader.Complete(); | |
Console.WriteLine($"read done"); | |
})).ConfigureAwait(false); | |
// PipeFactory is threadsafe?,and IPipe is not? | |
// using (var factory = new PipeFactory(BufferPool.Default)) | |
// { | |
// var opt = new PipeOptions(); | |
// var data = new byte[DataSize]; | |
// var rbuf = new byte[DataSize]; | |
// new Span<byte>(data).Fill(2); | |
// var pipe = factory.Create(); | |
// Console.WriteLine($"{pipe.Reader.GetType()}, {pipe.Writer.GetType()}"); | |
// long total = 0; | |
// for (int i = 0; i < MaxLoop; i++) | |
// { | |
// var wbuf = pipe.Writer.Alloc(DataSize); | |
// wbuf.Write(data); | |
// await wbuf.FlushAsync(); | |
// // await pipe.Reader.ReadAsync(new ArraySegment<byte>(rbuf)); | |
// var r = await pipe.Reader.ReadAsync(); | |
// foreach (var buf in r.Buffer) | |
// { | |
// ArraySegment<byte> seg; | |
// if (buf.TryGetArray(out seg)) | |
// { | |
// for (int j = seg.Offset; j < seg.Offset + seg.Count; j++) | |
// { | |
// total += seg[j]; | |
// } | |
// } | |
// } | |
// pipe.Reader.Advance(r.Buffer.End); | |
// } | |
// Console.WriteLine($"{total}"); | |
// } | |
} | |
static async Task MultiThreadPipeline() | |
{ | |
const int LoopNum = 1000; | |
const int DataSize = 5000; | |
const int TaskNum = 5; | |
var opt = new PipeOptions(minimumSegmentSize: DataSize); | |
var pipe = new Pipe(opt); | |
try | |
{ | |
} | |
finally | |
{ | |
pipe.Reader.Complete(); | |
} | |
// using (var pfac = new PipeFactory(BufferPool.Default)) | |
// { | |
// var data = new byte[DataSize]; | |
// new Span<byte>(data).Fill(2); | |
// var pipe = pfac.Create(opt); | |
// long total = 0; | |
// // IPipe is threadsafe? | |
// await Task.WhenAll( | |
// Task.WhenAll(Enumerable.Range(0, TaskNum) | |
// .Select(async (idx) => | |
// { | |
// try | |
// { | |
// for (int i = 0; i < LoopNum / TaskNum; i++) | |
// { | |
// await Task.Yield(); | |
// var wbuf = pipe.Writer.Alloc(DataSize); | |
// // wbuf.Ensure(DataSize); | |
// wbuf.Buffer.Span.Slice(0, DataSize).Fill((byte)(i & 0xff)); | |
// wbuf.Advance(DataSize); | |
// // wbuf.Write(data); | |
// await wbuf.FlushAsync(); | |
// } | |
// } | |
// catch (Exception e) | |
// { | |
// Console.WriteLine($"{e}"); | |
// throw; | |
// } | |
// })).ContinueWith(t => | |
// { | |
// Console.WriteLine($"write completed"); | |
// pipe.Writer.Complete(); | |
// }), | |
// Task.Run(async () => | |
// { | |
// bool isFirstRead = true; | |
// // async pattern | |
// while (true) | |
// { | |
// var res = await pipe.Reader.ReadAsync(); | |
// if (res.IsCancelled) | |
// { | |
// break; | |
// } | |
// foreach (var buf in res.Buffer) | |
// { | |
// for (int i = 0; i < buf.Length; i++) | |
// { | |
// total += buf.Span[i]; | |
// } | |
// } | |
// Console.WriteLine($"buffer length is {res.Buffer.Length}"); | |
// if (!res.Buffer.IsEmpty) | |
// { | |
// pipe.Reader.Advance(res.Buffer.End); | |
// } | |
// if (!isFirstRead && res.IsCompleted && res.Buffer.IsEmpty) | |
// { | |
// Console.WriteLine($"read done:{res.IsCompleted}, {res.Buffer.Length}"); | |
// break; | |
// } | |
// isFirstRead = false; | |
// } | |
// // sync pattern | |
// // ReadResult res; | |
// // while (pipe.Reader.TryRead(out res)) | |
// // { | |
// // foreach (var buf in res.Buffer) | |
// // { | |
// // for (int i = 0; i < buf.Length; i++) | |
// // { | |
// // total += buf.Span[i]; | |
// // } | |
// // } | |
// // pipe.Reader.Advance(res.Buffer.End); | |
// // if (res.IsCompleted || res.IsCancelled) | |
// // { | |
// // Console.WriteLine($"read completed"); | |
// // break; | |
// // } | |
// // } | |
// }) | |
// ); | |
// Console.WriteLine($"{total}"); | |
// } | |
} | |
static void Main(string[] args) | |
{ | |
// MultiThreadPipeline().Wait(); | |
PipelineTest().Wait(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>netcoreapp2.0</TargetFramework> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="System.IO.Pipelines" Version="4.5.0-preview2-26216-01" /> | |
</ItemGroup> | |
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<configuration> | |
<packageSources> | |
<add key="corefxlab" value="https://dotnet.myget.org/F/dotnet-core/api/v3/index.json"/> | |
</packageSources> | |
</configuration> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment