Created
May 27, 2020 18:29
-
-
Save vanbukin/a723c64890fe098d3db823df460c096e to your computer and use it in GitHub Desktop.
System.Text.Json CosmosDb Serializer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.IO; | |
using System.Text.Encodings.Web; | |
using System.Text.Json; | |
using Microsoft.Azure.Cosmos; | |
namespace Cosmos.Serialization | |
{ | |
/// <summary> | |
/// Zero-alloc System.Text.Json CosmosDb serializer. | |
/// </summary> | |
public class SystemTextJsonSerializer : CosmosSerializer | |
{ | |
private static readonly JsonReaderOptions _jsonReaderOptions = new JsonReaderOptions | |
{ | |
AllowTrailingCommas = false, | |
CommentHandling = JsonCommentHandling.Skip, | |
MaxDepth = 64 | |
}; | |
private static readonly JsonWriterOptions _jsonWriterOptions = new JsonWriterOptions | |
{ | |
Encoder = JavaScriptEncoder.Default, | |
Indented = false, | |
SkipValidation = false | |
}; | |
public override unsafe T FromStream<T>(Stream stream) | |
{ | |
if (typeof(Stream).IsAssignableFrom(typeof(T))) | |
{ | |
return (T) (object) stream; | |
} | |
var currentBufferSize = 8192; | |
var stackBufferPtr = stackalloc byte[currentBufferSize]; | |
var stackBuffer = new Span<byte>(stackBufferPtr, currentBufferSize); | |
var bytesRead = stream.Read(stackBuffer); | |
if (bytesRead < currentBufferSize) | |
{ | |
stream.Close(); | |
var jsonReader = new Utf8JsonReader(stackBuffer.Slice(0, bytesRead), _jsonReaderOptions); | |
return JsonSerializer.Deserialize<T>(ref jsonReader); | |
} | |
var result = DeserializeObjectUsingArrayPool<T>(stream, bytesRead, stackBufferPtr, currentBufferSize * 2); | |
return result; | |
} | |
private static unsafe T DeserializeObjectUsingArrayPool<T>( | |
Stream stream, | |
int totalRead, | |
byte* stackPtr, | |
int arrayPoolBufferSize) | |
{ | |
var buffer = ArrayPool<byte>.Shared.Rent(arrayPoolBufferSize); | |
fixed (byte* bufferPtr = buffer) | |
{ | |
Buffer.MemoryCopy( | |
stackPtr, | |
bufferPtr, | |
arrayPoolBufferSize, | |
totalRead); | |
} | |
var stackBuffer = new Span<byte>(stackPtr, totalRead); | |
int bytesRead; | |
var currentBufferSize = arrayPoolBufferSize; | |
while ((bytesRead = stream.Read(stackBuffer)) > 0) | |
{ | |
var availableMemory = currentBufferSize - totalRead; | |
if (bytesRead > availableMemory) | |
{ | |
var newBufferSize = currentBufferSize * 2; | |
buffer = CreateExtendedBuffer(buffer, currentBufferSize, newBufferSize); | |
currentBufferSize = newBufferSize; | |
availableMemory = currentBufferSize - totalRead; | |
} | |
fixed (byte* bufferPtr = buffer) | |
{ | |
var destinationPtr = bufferPtr + totalRead; | |
Buffer.MemoryCopy(stackPtr, destinationPtr, availableMemory, bytesRead); | |
} | |
totalRead += bytesRead; | |
} | |
stream.Close(); | |
var utf8JsonBuffer = buffer.AsSpan(0, totalRead); | |
var jsonReader = new Utf8JsonReader(utf8JsonBuffer, _jsonReaderOptions); | |
var result = JsonSerializer.Deserialize<T>(ref jsonReader); | |
ArrayPool<byte>.Shared.Return(buffer); | |
return result; | |
} | |
private static unsafe byte[] CreateExtendedBuffer(byte[] source, int sourceBytesToCopy, int destinationSize) | |
{ | |
var destination = ArrayPool<byte>.Shared.Rent(destinationSize); | |
fixed (byte* sourcePtr = source, destinationPtr = destination) | |
{ | |
Buffer.MemoryCopy(sourcePtr, destinationPtr, destinationSize, sourceBytesToCopy); | |
} | |
ArrayPool<byte>.Shared.Return(source); | |
return destination; | |
} | |
public override Stream ToStream<T>(T input) | |
{ | |
var resultStream = new MemoryStream(8192); | |
using var writer = new Utf8JsonWriter(resultStream, _jsonWriterOptions); | |
JsonSerializer.Serialize(writer, input); | |
resultStream.Position = 0; | |
return resultStream; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment