Skip to content

Instantly share code, notes, and snippets.

@vanbukin
Created May 27, 2020 18:29
Show Gist options
  • Save vanbukin/a723c64890fe098d3db823df460c096e to your computer and use it in GitHub Desktop.
Save vanbukin/a723c64890fe098d3db823df460c096e to your computer and use it in GitHub Desktop.
System.Text.Json CosmosDb Serializer
using System;
using System.Buffers;
using System.IO;
using System.Text.Encodings.Web;
using System.Text.Json;
using Microsoft.Azure.Cosmos;
namespace Cosmos.Serialization
{
/// <summary>
/// Zero-alloc System.Text.Json CosmosDb serializer.
/// </summary>
public class SystemTextJsonSerializer : CosmosSerializer
{
private static readonly JsonReaderOptions _jsonReaderOptions = new JsonReaderOptions
{
AllowTrailingCommas = false,
CommentHandling = JsonCommentHandling.Skip,
MaxDepth = 64
};
private static readonly JsonWriterOptions _jsonWriterOptions = new JsonWriterOptions
{
Encoder = JavaScriptEncoder.Default,
Indented = false,
SkipValidation = false
};
public override unsafe T FromStream<T>(Stream stream)
{
if (typeof(Stream).IsAssignableFrom(typeof(T)))
{
return (T) (object) stream;
}
var currentBufferSize = 8192;
var stackBufferPtr = stackalloc byte[currentBufferSize];
var stackBuffer = new Span<byte>(stackBufferPtr, currentBufferSize);
var bytesRead = stream.Read(stackBuffer);
if (bytesRead < currentBufferSize)
{
stream.Close();
var jsonReader = new Utf8JsonReader(stackBuffer.Slice(0, bytesRead), _jsonReaderOptions);
return JsonSerializer.Deserialize<T>(ref jsonReader);
}
var result = DeserializeObjectUsingArrayPool<T>(stream, bytesRead, stackBufferPtr, currentBufferSize * 2);
return result;
}
private static unsafe T DeserializeObjectUsingArrayPool<T>(
Stream stream,
int totalRead,
byte* stackPtr,
int arrayPoolBufferSize)
{
var buffer = ArrayPool<byte>.Shared.Rent(arrayPoolBufferSize);
fixed (byte* bufferPtr = buffer)
{
Buffer.MemoryCopy(
stackPtr,
bufferPtr,
arrayPoolBufferSize,
totalRead);
}
var stackBuffer = new Span<byte>(stackPtr, totalRead);
int bytesRead;
var currentBufferSize = arrayPoolBufferSize;
while ((bytesRead = stream.Read(stackBuffer)) > 0)
{
var availableMemory = currentBufferSize - totalRead;
if (bytesRead > availableMemory)
{
var newBufferSize = currentBufferSize * 2;
buffer = CreateExtendedBuffer(buffer, currentBufferSize, newBufferSize);
currentBufferSize = newBufferSize;
availableMemory = currentBufferSize - totalRead;
}
fixed (byte* bufferPtr = buffer)
{
var destinationPtr = bufferPtr + totalRead;
Buffer.MemoryCopy(stackPtr, destinationPtr, availableMemory, bytesRead);
}
totalRead += bytesRead;
}
stream.Close();
var utf8JsonBuffer = buffer.AsSpan(0, totalRead);
var jsonReader = new Utf8JsonReader(utf8JsonBuffer, _jsonReaderOptions);
var result = JsonSerializer.Deserialize<T>(ref jsonReader);
ArrayPool<byte>.Shared.Return(buffer);
return result;
}
private static unsafe byte[] CreateExtendedBuffer(byte[] source, int sourceBytesToCopy, int destinationSize)
{
var destination = ArrayPool<byte>.Shared.Rent(destinationSize);
fixed (byte* sourcePtr = source, destinationPtr = destination)
{
Buffer.MemoryCopy(sourcePtr, destinationPtr, destinationSize, sourceBytesToCopy);
}
ArrayPool<byte>.Shared.Return(source);
return destination;
}
public override Stream ToStream<T>(T input)
{
var resultStream = new MemoryStream(8192);
using var writer = new Utf8JsonWriter(resultStream, _jsonWriterOptions);
JsonSerializer.Serialize(writer, input);
resultStream.Position = 0;
return resultStream;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment