Created
October 15, 2021 13:09
-
-
Save schmitch/798525b13ca14e9970288d1392412575 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.IO; | |
namespace ConsoleApp1 | |
{ | |
/// <summary> | |
/// Helper stream class to represent a slice of a larger stream to save memory when dealing with large streams | |
/// and remove the extra copy operations | |
/// This class is inspired from System.IO.Compression in dot net core. Reference implementation can be found here | |
/// https://github.com/dotnet/corefx/blob/d59f6e5a1bdabdd05317fd727efb59345e328b80/src/System.IO.Compression/src/System/IO/Compression/ZipCustomStreams.cs#L147 | |
/// </summary> | |
internal class ReadOnlySubStream : Stream | |
{ | |
private readonly long _startInSuperStream; | |
private long _positionInSuperStream; | |
private readonly long _endInSuperStream; | |
private readonly Stream _superStream; | |
private bool _canRead; | |
private bool _isDisposed; | |
public ReadOnlySubStream(Stream superStream, long startPosition, long maxLength) | |
{ | |
this._startInSuperStream = startPosition; | |
this._positionInSuperStream = startPosition; | |
this._endInSuperStream = startPosition + maxLength; | |
this._superStream = superStream; | |
this._canRead = true; | |
this._isDisposed = false; | |
} | |
public override long Length | |
{ | |
get | |
{ | |
ThrowIfDisposed(); | |
return _endInSuperStream - _startInSuperStream; | |
} | |
} | |
public override long Position | |
{ | |
get | |
{ | |
ThrowIfDisposed(); | |
return _positionInSuperStream - _startInSuperStream; | |
} | |
set | |
{ | |
ThrowIfDisposed(); | |
throw new NotSupportedException("seek not support"); | |
} | |
} | |
public override bool CanRead => _superStream.CanRead && _canRead; | |
public override bool CanSeek => false; | |
public override bool CanWrite => false; | |
private void ThrowIfDisposed() | |
{ | |
if (_isDisposed) | |
throw new ObjectDisposedException(GetType().ToString(), nameof(this._superStream)); | |
} | |
private void ThrowIfCantRead() | |
{ | |
if (!CanRead) | |
throw new NotSupportedException("read not support"); | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
// parameter validation sent to _superStream.Read | |
int origCount = count; | |
ThrowIfDisposed(); | |
ThrowIfCantRead(); | |
if (_superStream.Position != _positionInSuperStream) | |
_superStream.Seek(_positionInSuperStream, SeekOrigin.Begin); | |
if (_positionInSuperStream + count > _endInSuperStream) | |
count = (int)(_endInSuperStream - _positionInSuperStream); | |
Debug.Assert(count >= 0); | |
Debug.Assert(count <= origCount); | |
int ret = _superStream.Read(buffer, offset, count); | |
_positionInSuperStream += ret; | |
return ret; | |
} | |
public override long Seek(long offset, SeekOrigin origin) | |
{ | |
ThrowIfDisposed(); | |
throw new NotSupportedException("seek not support"); | |
} | |
public override void SetLength(long value) | |
{ | |
ThrowIfDisposed(); | |
throw new NotSupportedException("seek and write not support"); | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
ThrowIfDisposed(); | |
throw new NotSupportedException("write not support"); | |
} | |
public override void Flush() | |
{ | |
ThrowIfDisposed(); | |
throw new NotSupportedException("write not support"); | |
} | |
// Close the stream for reading. Note that this does NOT close the superStream (since | |
// the subStream is just 'a chunk' of the super-stream | |
protected override void Dispose(bool disposing) | |
{ | |
if (disposing && !_isDisposed) | |
{ | |
_canRead = false; | |
_isDisposed = true; | |
} | |
base.Dispose(disposing); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Net.Http; | |
using System.Text; | |
using System.Text.Json; | |
using System.Threading.Tasks; | |
using System.Xml.Serialization; | |
namespace ConsoleApp1 | |
{ | |
public class InitiateMultipartUploadResult | |
{ | |
public string Bucket { get; set; } | |
public string Key { get; set; } | |
public string UploadId { get; set; } | |
} | |
public class ProgramSeaweedFs | |
{ | |
private const int DefaultMaxSliceSize = 5 * 1024 * 1024; | |
public static async Task Main(string[] args) | |
{ | |
var client = new HttpClient(); | |
// await CreateBucket(client); | |
var obj = await InitiateMultipartUploadResult(client); | |
await using var _uploadStream = File.OpenRead("/Users/schmitch/Downloads/weedfs/myfile.txt"); | |
var sliceRequests = GetUploadSliceRequests(_uploadStream); | |
int partNumber = 0; | |
foreach (var (RangeBegin, RangeLength, RangeTotal) in sliceRequests) | |
{ | |
var sw = new Stopwatch(); | |
sw.Start(); | |
_uploadStream.Seek(RangeBegin, SeekOrigin.Begin); | |
await using var requestBodyStream = new ReadOnlySubStream( | |
_uploadStream, | |
RangeBegin, | |
RangeLength); | |
partNumber++; | |
Console.WriteLine($"Uploading Part: {partNumber}, {RangeBegin}-{RangeLength}"); | |
await UploadPart(client, obj.Key, obj.UploadId, partNumber, requestBodyStream); | |
sw.Stop(); | |
Console.WriteLine($"Uploading Part Finished: {partNumber}, {sw.ElapsedMilliseconds}ms"); | |
// progress?.Report(RangeBegin); //report the progress of upload | |
} | |
await CompleteMultipartUpload(client, obj.Key, obj.UploadId); | |
} | |
private static async Task UploadPart( | |
HttpClient client, | |
string key, | |
string uploadId, | |
int partNumber, | |
Stream stream) | |
{ | |
using var request = new HttpRequestMessage(HttpMethod.Put, | |
$"http://localhost:8333/hase/{key}?partNumber={partNumber}&uploadId={uploadId}") | |
{ | |
Content = new StreamContent(stream), | |
}; | |
using var response = await client.SendAsync(request); | |
response.EnsureSuccessStatusCode(); | |
} | |
private static async Task CompleteMultipartUpload(HttpClient client, string key, string uploadId) | |
{ | |
using var request = | |
new HttpRequestMessage(HttpMethod.Post, $"http://localhost:8333/hase/{key}?uploadId={uploadId}") | |
{ | |
Content = new StringContent( | |
@"<?xml version=""1.0"" encoding=""UTF-8""?><CompleteMultipartUpload xmlns=""http://s3.amazonaws.com/doc/2006-03-01/""></CompleteMultipartUpload>"), | |
}; | |
using var response = await client.SendAsync(request); | |
Console.WriteLine($"Text: {await response.Content.ReadAsStringAsync()}"); | |
} | |
private static async Task<InitiateMultipartUploadResult> InitiateMultipartUploadResult(HttpClient client) | |
{ | |
using var request = new HttpRequestMessage(HttpMethod.Post, $"http://localhost:8333/hase/hase.txt?uploads") | |
{ | |
Content = new MultipartFormDataContent(), | |
}; | |
using var response = await client.SendAsync(request); | |
var responseStream = await response.Content.ReadAsStreamAsync(); | |
return (InitiateMultipartUploadResult)new XmlSerializer( | |
typeof(InitiateMultipartUploadResult), | |
"http://s3.amazonaws.com/doc/2006-03-01/").Deserialize(responseStream); | |
} | |
private static async Task CreateBucket(HttpClient client) | |
{ | |
using var request = new HttpRequestMessage(HttpMethod.Put, $"http://localhost:8333/") | |
{ | |
Content = new StringContent( | |
@"<CreateBucketConfiguration xmlns=""http://s3.amazonaws.com/doc/2006-03-01/""> | |
<LocationConstraint>string</LocationConstraint> | |
</CreateBucketConfiguration>", Encoding.UTF8, "text/xml"), | |
}; | |
using var response = await client.SendAsync(request); | |
var responseText = await response.Content.ReadAsStringAsync(); | |
Console.WriteLine($"ResponseText: {responseText}"); | |
} | |
private static IEnumerable<(long, long, long)> GetUploadSliceRequests(Stream stream) | |
{ | |
var currentRangeBegins = 0L; | |
var item2 = stream.Length; | |
while (currentRangeBegins <= item2) | |
{ | |
var nextSliceSize = NextSliceSize(currentRangeBegins, item2); | |
// var uploadRequest = new UploadSliceRequest( | |
// Session.Key, | |
// _client, | |
// currentRangeBegins, | |
// currentRangeBegins + nextSliceSize - 1, | |
// TotalUploadLength); | |
yield return (currentRangeBegins, currentRangeBegins + nextSliceSize - 1, item2); | |
currentRangeBegins += nextSliceSize; | |
} | |
} | |
private static long NextSliceSize(long rangeBegin, long rangeEnd) | |
{ | |
var sizeBasedOnRange = rangeEnd - rangeBegin + 1; | |
return sizeBasedOnRange > DefaultMaxSliceSize ? DefaultMaxSliceSize : sizeBasedOnRange; | |
} | |
private List<Tuple<long, long>> GetRangesRemaining(Stream uploadStream) | |
{ | |
// nextExpectedRanges: https://dev.onedrive.com/items/upload_large_files.htm | |
// Sample: ["12345-55232","77829-99375"] | |
// Also, second number in range can be blank, which means 'until the end' | |
var newRangesRemaining = new List<Tuple<long, long>>(); | |
foreach (var range in new[] { "0-" }) | |
{ | |
var rangeSpecifiers = range.Split('-'); | |
newRangesRemaining.Add(new Tuple<long, long>(long.Parse(rangeSpecifiers[0]), | |
string.IsNullOrEmpty(rangeSpecifiers[1]) | |
? uploadStream.Length - 1 | |
: long.Parse(rangeSpecifiers[1]))); | |
} | |
return newRangesRemaining; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment