Skip to content

Instantly share code, notes, and snippets.

@compustar
Last active August 15, 2016 03:10
Show Gist options
  • Select an option

  • Save compustar/65d644654628057808e88b4198b05b4e to your computer and use it in GitHub Desktop.

Select an option

Save compustar/65d644654628057808e88b4198b05b4e to your computer and use it in GitHub Desktop.
Streaming Classes
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2012 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
namespace Nuclex.Support.IO {
/// <summary>Chains a series of independent streams into a single stream</summary>
/// <remarks>
/// <para>
/// This class can be used to chain multiple independent streams into a single
/// stream that acts as if its chained streams were only one combined stream.
/// It is useful to avoid creating huge memory streams or temporary files when
/// you just need to prepend or append some data to a stream or if you need to
/// read a file that was split into several parts as if it was a single file.
/// </para>
/// <para>
/// It is not recommended to change the size of any chained stream after it
/// has become part of a stream chainer, though the stream chainer will do its
/// best to cope with the changes as they occur. Increasing the length of a
/// chained stream is generally not an issue for streams that support seeking,
/// but reducing the length might invalidate the stream chainer's file pointer,
/// resulting in an IOException when Read() or Write() is next called.
/// </para>
/// </remarks>
public class ChainStream : Stream {
/// <summary>Initializes a new stream chainer</summary>
/// <param name="streams">Array of streams that will be chained together</param>
public ChainStream(params Stream[] streams) {
List<Stream> s = new List<Stream>();
AddStreams(s, streams);
this.streams = s.ToArray();
determineCapabilities();
}
private void AddStreams(List<Stream> result, IEnumerable<Stream> from) {
foreach (var stream in from) {
if (stream is ChainStream) {
AddStreams(result, (stream as ChainStream).streams);
} else {
result.Add(stream);
}
}
}
/// <summary>Whether data can be read from the stream</summary>
public override bool CanRead {
get { return this.allStreamsCanRead; }
}
/// <summary>Whether the stream supports seeking</summary>
public override bool CanSeek {
get { return this.allStreamsCanSeek; }
}
/// <summary>Whether data can be written into the stream</summary>
public override bool CanWrite {
get { return this.allStreamsCanWrite; }
}
/// <summary>
/// Clears all buffers for this stream and causes any buffered data to be written
/// to the underlying device.
/// </summary>
public override void Flush() {
for (int index = 0; index < this.streams.Length; ++index) {
this.streams[index].Flush();
}
}
/// <summary>Length of the stream in bytes</summary>
/// <exception cref="NotSupportedException">
/// At least one of the chained streams does not support seeking
/// </exception>
public override long Length {
get {
if (!this.allStreamsCanSeek) {
throw makeSeekNotSupportedException("determine length");
}
// Sum up the length of all chained streams
long length = 0;
for (int index = 0; index < this.streams.Length; ++index) {
length += this.streams[index].Length;
}
return length;
}
}
/// <summary>Absolute position of the file pointer within the stream</summary>
/// <exception cref="NotSupportedException">
/// At least one of the chained streams does not support seeking
/// </exception>
public override long Position {
get {
if (!this.allStreamsCanSeek) {
throw makeSeekNotSupportedException("seek");
}
return this.position;
}
set { moveFilePointer(value); }
}
/// <summary>
/// Reads a sequence of bytes from the stream and advances the position of
/// the file pointer by the number of bytes read.
/// </summary>
/// <param name="buffer">Buffer that will receive the data read from the stream</param>
/// <param name="offset">
/// Offset in the buffer at which the stream will place the data read
/// </param>
/// <param name="count">Maximum number of bytes that will be read</param>
/// <returns>
/// The number of bytes that were actually read from the stream and written into
/// the provided buffer
/// </returns>
/// <exception cref="NotSupportedException">
/// The chained stream at the current position does not support reading
/// </exception>
public override int Read(byte[] buffer, int offset, int count) {
if (!this.allStreamsCanRead) {
throw new NotSupportedException(
"Can't read: at least one of the chained streams doesn't support reading"
);
}
int totalBytesRead = 0;
int lastStreamIndex = this.streams.Length - 1;
if (this.CanSeek) {
// Find out from which stream and at which position we need to begin reading
int streamIndex;
long streamOffset;
findStreamIndexAndOffset(this.position, out streamIndex, out streamOffset);
// Try to read from the stream our current file pointer falls into. If more
// data was requested than the stream contains, read each stream to its end
// until we either have enough data or run out of streams.
while (count > 0) {
Stream currentStream = this.streams[streamIndex];
// Read up to count bytes from the current stream. Count is decreased each
// time we successfully get data and holds the number of bytes remaining
// to be read
long maximumBytes = Math.Min(count, currentStream.Length - streamOffset);
currentStream.Position = streamOffset;
int bytesRead = currentStream.Read(buffer, offset, (int)maximumBytes);
// Accumulate the total number of bytes we read for the return value
totalBytesRead += bytesRead;
// If the stream returned partial data, stop here. Also, if this was the
// last stream we queried, this is as far as we can go.
if ((bytesRead < maximumBytes) || (streamIndex == lastStreamIndex)) {
break;
}
// Move on to the next stream in the chain
++streamIndex;
streamOffset = 0;
count -= bytesRead;
offset += bytesRead;
}
this.position += totalBytesRead;
} else {
// Try to read from the active read stream. If the end of the active read
// stream is reached, switch to the next stream in the chain until we have
// no more streams left to read from
while (this.activeReadStreamIndex <= lastStreamIndex) {
// Try to read from the stream. The stream can either return any amount
// of data > 0 if there's still data left ot be read or 0 if the end of
// the stream was reached
Stream activeStream = this.streams[this.activeReadStreamIndex];
if (activeStream.CanSeek) {
activeStream.Position = this.activeReadStreamPosition;
}
totalBytesRead = activeStream.Read(buffer, offset, count);
// If we got any data, we're done, exit the loop
if (totalBytesRead != 0) {
break;
} else { // Otherwise, go to the next stream in the chain
this.activeReadStreamPosition = 0;
++this.activeReadStreamIndex;
}
}
this.activeReadStreamPosition += totalBytesRead;
}
return totalBytesRead;
}
/// <summary>Changes the position of the file pointer</summary>
/// <param name="offset">
/// Offset to move the file pointer by, relative to the position indicated by
/// the <paramref name="origin" /> parameter.
/// </param>
/// <param name="origin">
/// Reference point relative to which the file pointer is placed
/// </param>
/// <returns>The new absolute position within the stream</returns>
public override long Seek(long offset, SeekOrigin origin) {
switch (origin) {
case SeekOrigin.Begin: {
return Position = offset;
}
case SeekOrigin.Current: {
return Position += offset;
}
case SeekOrigin.End: {
return Position = (Length + offset);
}
default: {
throw new ArgumentException("Invalid seek origin", "origin");
}
}
}
/// <summary>Changes the length of the stream</summary>
/// <param name="value">New length the stream shall have</param>
/// <exception cref="NotSupportedException">
/// Always, the stream chainer does not support the SetLength() operation
/// </exception>
public override void SetLength(long value) {
throw new NotSupportedException("Resizing chained streams is not supported");
}
/// <summary>
/// Writes a sequence of bytes to the stream and advances the position of
/// the file pointer by the number of bytes written.
/// </summary>
/// <param name="buffer">
/// Buffer containing the data that will be written to the stream
/// </param>
/// <param name="offset">
/// Offset in the buffer at which the data to be written starts
/// </param>
/// <param name="count">Number of bytes that will be written into the stream</param>
/// <remarks>
/// The behavior of this method is as follows: If one or more chained streams
/// do not support seeking, all data is appended to the final stream in the
/// chain. Otherwise, writing will begin with the stream the current file pointer
/// offset falls into. If the end of that stream is reached, writing continues
/// in the next stream. On the last stream, writing more data into the stream
/// that it current size allows will enlarge the stream.
/// </remarks>
public override void Write(byte[] buffer, int offset, int count) {
if (!this.allStreamsCanWrite) {
throw new NotSupportedException(
"Can't write: at least one of the chained streams doesn't support writing"
);
}
int remaining = count;
// If seeking is supported, we can write into the mid of the stream,
// if the user so desires
if (this.allStreamsCanSeek) {
// Find out in which stream and at which position we need to begin writing
int streamIndex;
long streamOffset;
findStreamIndexAndOffset(this.position, out streamIndex, out streamOffset);
// Write data into the streams, switching over to the next stream if data is
// too large to fit into the current stream, until all data is spent.
int lastStreamIndex = this.streams.Length - 1;
while (remaining > 0) {
Stream currentStream = this.streams[streamIndex];
// If this is the last stream, just write. If the data is larger than the last
// stream's remaining bytes, it will append to that stream, enlarging it.
if (streamIndex == lastStreamIndex) {
// Write all remaining data into the last stream
currentStream.Position = streamOffset;
currentStream.Write(buffer, offset, remaining);
remaining = 0;
} else { // We're writing into a stream that's followed by another stream
// Find out how much data we can put into the current stream without
// enlarging it (if seeking is supported, so is the Length property)
long currentStreamRemaining = currentStream.Length - streamOffset;
int bytesToWrite = (int)Math.Min((long)remaining, currentStreamRemaining);
// Write all data that can fit into the current stream
currentStream.Position = streamOffset;
currentStream.Write(buffer, offset, bytesToWrite);
// Adjust the offsets and count for the next stream
offset += bytesToWrite;
remaining -= bytesToWrite;
streamOffset = 0;
++streamIndex;
}
}
} else { // Seeking not supported, append everything to the last stream
Stream lastStream = this.streams[this.streams.Length - 1];
if (lastStream.CanSeek) {
lastStream.Seek(0, SeekOrigin.End);
}
lastStream.Write(buffer, offset, remaining);
}
this.position += count;
}
/// <summary>Streams being combined by the stream chainer</summary>
public Stream[] ChainedStreams {
get { return this.streams; }
}
/// <summary>Moves the file pointer</summary>
/// <param name="position">New position the file pointer will be moved to</param>
private void moveFilePointer(long position) {
if (!this.allStreamsCanSeek) {
throw makeSeekNotSupportedException("seek");
}
// Seemingly, it is okay to move the file pointer beyond the end of
// the stream until you try to Read() or Write()
this.position = position;
}
/// <summary>
/// Finds the stream index and local offset for an absolute position within
/// the combined streams.
/// </summary>
/// <param name="overallPosition">Absolute position within the combined streams</param>
/// <param name="streamIndex">
/// Index of the stream the overall position falls into
/// </param>
/// <param name="streamPosition">
/// Local position within the stream indicated by <paramref name="streamIndex" />
/// </param>
private void findStreamIndexAndOffset(
long overallPosition, out int streamIndex, out long streamPosition
) {
Debug.Assert(
this.allStreamsCanSeek, "Call to findStreamIndexAndOffset() but no seek support"
);
// In case the position is beyond the stream's end, this is what we will
// return to the caller
streamIndex = (this.streams.Length - 1);
// Search until we have found the stream the position must lie in
for (int index = 0; index < this.streams.Length; ++index) {
long streamLength = this.streams[index].Length;
if (overallPosition < streamLength) {
streamIndex = index;
break;
}
overallPosition -= streamLength;
}
// The overall position will have been decreased by each skipped stream's length,
// so it should now contain the local position for the final stream we checked.
streamPosition = overallPosition;
}
/// <summary>Determines the capabilities of the chained streams</summary>
/// <remarks>
/// <para>
/// Theoretically, it would be possible to create a stream chainer that supported
/// writing only when the file pointer was on a chained stream with write support,
/// that could seek within the beginning of the stream until the first chained
/// stream with no seek capability was encountered and so on.
/// </para>
/// <para>
/// However, the interface of the Stream class requires us to make a definitive
/// statement as to whether the Stream supports seeking, reading and writing.
/// We can't return "maybe" or "mostly" in CanSeek, so the only sane choice that
/// doesn't violate the Stream interface is to implement these capabilities as
/// all or nothing - either all streams support a feature, or the stream chainer
/// will report the feature as unsupported.
/// </para>
/// </remarks>
private void determineCapabilities() {
this.allStreamsCanSeek = true;
this.allStreamsCanRead = true;
this.allStreamsCanWrite = true;
for (int index = 0; index < this.streams.Length; ++index) {
this.allStreamsCanSeek &= this.streams[index].CanSeek;
this.allStreamsCanRead &= this.streams[index].CanRead;
this.allStreamsCanWrite &= this.streams[index].CanWrite;
}
}
/// <summary>
/// Constructs a NotSupportException for an error caused by one of the chained
/// streams having no seek support
/// </summary>
/// <param name="action">Action that was tried to perform</param>
/// <returns>The newly constructed NotSupportedException</returns>
private static NotSupportedException makeSeekNotSupportedException(string action) {
return new NotSupportedException(
string.Format(
"Can't {0}: at least one of the chained streams does not support seeking",
action
)
);
}
/// <summary>Streams that have been chained together</summary>
private Stream[] streams;
/// <summary>Current position of the overall file pointer</summary>
private long position;
/// <summary>Stream we're currently reading from if seeking is not supported</summary>
/// <remarks>
/// If seeking is not supported, the stream chainer will read from each stream
/// until the end was reached
/// sequentially
/// </remarks>
private int activeReadStreamIndex;
/// <summary>Position in the current read stream if seeking is not supported</summary>
/// <remarks>
/// If there is a mix of streams supporting seeking and not supporting seeking, we
/// need to keep track of the read index for those streams that do. If, for example,
/// the last stream is written to and read from in succession, the file pointer
/// of that stream would have been moved to the end by the write attempt, skipping
/// data that should have been read in the following read attempt.
/// </remarks>
private long activeReadStreamPosition;
/// <summary>Whether all of the chained streams support seeking</summary>
private bool allStreamsCanSeek;
/// <summary>Whether all of the chained streams support reading</summary>
private bool allStreamsCanRead;
/// <summary>Whether all of the chained streams support writing</summary>
private bool allStreamsCanWrite;
}
} // namespace Nuclex.Support.IO
// --------------------------------------------------------------------------------------------------------------------
// <copyright file="BinaryStreamStack.cs" company="Jake Woods">
// Copyright (c) 2013 Jake Woods
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software
// and associated documentation files (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute,
// sublicense, and/or sell copies of the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies
// or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
// INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
// ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// </copyright>
// <author>Jake Woods</author>
// <summary>
// Provides character based and byte based stream-like read operations over multiple
// streams and provides methods to add data to the front of the buffer.
// </summary>
// --------------------------------------------------------------------------------------------------------------------
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace HttpMultipartParser {
/// <summary>
/// Provides character based and byte based stream-like read operations over multiple
/// streams and provides methods to add data to the front of the buffer.
/// </summary>
internal class BinaryStreamStack {
#region Fields
/// <summary>
/// Holds the streams to read from, the stream on the top of the
/// stack will be read first.
/// </summary>
private readonly Stack<BinaryReader> streams = new Stack<BinaryReader>();
#endregion
#region Constructors and Destructors
/// <summary>
/// Initializes a new instance of the <see cref="BinaryStreamStack" /> class with the default
/// encoding of UTF8.
/// </summary>
public BinaryStreamStack()
: this(Encoding.UTF8) {
}
/// <summary>
/// Initializes a new instance of the <see cref="BinaryStreamStack" /> class.
/// </summary>
/// <param name="encoding">
/// The encoding to use for character based operations.
/// </param>
public BinaryStreamStack(Encoding encoding) {
CurrentEncoding = encoding;
}
#endregion
#region Public Properties
/// <summary>
/// Gets or sets the current encoding.
/// </summary>
public Encoding CurrentEncoding { get; set; }
#endregion
#region Public Methods and Operators
/// <summary>
/// Returns true if there is any data left to read.
/// </summary>
/// <returns>
/// True or false.
/// </returns>
public bool HasData() {
return streams.Any();
}
/// <summary>
/// Returns the reader on the top of the stack but does not remove it.
/// </summary>
/// <returns>
/// The <see cref="BinaryReader" />.
/// </returns>
public BinaryReader Peek() {
return streams.Peek();
}
/// <summary>
/// Returns the reader on the top of the stack and removes it
/// </summary>
/// <returns>
/// The <see cref="BinaryReader" />.
/// </returns>
public BinaryReader Pop() {
return streams.Pop();
}
/// <summary>
/// Pushes data to the front of the stack. The most recently pushed data will
/// be read first.
/// </summary>
/// <param name="data">
/// The data to add to the stack.
/// </param>
public void Push(byte[] data) {
streams.Push(new BinaryReader(new MemoryStream(data), CurrentEncoding));
}
/// <summary>
/// Reads a single byte as an integer from the stack. Returns -1 if no
/// data is left to read.
/// </summary>
/// <returns>
/// The <see cref="byte" /> that was read.
/// </returns>
public int Read() {
BinaryReader top = streams.Peek();
int value;
while ((value = top.Read()) == -1) {
top.Dispose();
streams.Pop();
if (!streams.Any()) {
return -1;
}
top = streams.Peek();
}
return value;
}
/// <summary>
/// Reads the specified number of bytes from the stack, starting from a specified point in the byte array.
/// </summary>
/// <param name="buffer">
/// The buffer to read data into.
/// </param>
/// <param name="index">
/// The index of buffer to start reading into.
/// </param>
/// <param name="count">
/// The number of bytes to read into the buffer.
/// </param>
/// <returns>
/// The number of bytes read into buffer. This might be less than the number of bytes requested if that many bytes are not available,
/// or it might be zero if the end of the stream is reached.
/// </returns>
public int Read(byte[] buffer, int index, int count) {
if (!HasData()) {
return 0;
}
// Read through all the stream untill we exhaust them
// or untill count is satisfied
int amountRead = 0;
BinaryReader top = streams.Peek();
while (amountRead < count && streams.Any()) {
int read = top.Read(buffer, index + amountRead, count - amountRead);
if (read == 0) {
if ((top = NextStream()) == null) {
return amountRead;
}
} else {
amountRead += read;
}
}
return amountRead;
}
/// <summary>
/// Reads the specified number of characters from the stack, starting from a specified point in the byte array.
/// </summary>
/// <param name="buffer">
/// The buffer to read data into.
/// </param>
/// <param name="index">
/// The index of buffer to start reading into.
/// </param>
/// <param name="count">
/// The number of characters to read into the buffer.
/// </param>
/// <returns>
/// The number of characters read into buffer. This might be less than the number of bytes requested if that many bytes are not available,
/// or it might be zero if the end of the stream is reached.
/// </returns>
public int Read(char[] buffer, int index, int count) {
if (!HasData()) {
return 0;
}
// Read through all the stream untill we exhaust them
// or untill count is satisfied
int amountRead = 0;
BinaryReader top = streams.Peek();
while (amountRead < count && streams.Any()) {
int read = top.Read(buffer, index + amountRead, count - amountRead);
if (read == 0) {
if ((top = NextStream()) == null) {
return amountRead;
}
} else {
amountRead += read;
}
}
return amountRead;
}
/// <summary>
/// Reads the specified number of characters from the stack, starting from a specified point in the byte array.
/// </summary>
/// <returns>
/// A byte array containing all the data up to but not including the next newline in the stack.
/// </returns>
public byte[] ReadByteLine() {
bool dummy;
return ReadByteLine(out dummy);
}
/// <summary>
/// Reads a line from the stack delimited by the newline for this platform. The newline
/// characters will not be included in the stream
/// </summary>
/// <param name="hitStreamEnd">
/// This will be set to true if we did not end on a newline but instead found the end of
/// our data.
/// </param>
/// <returns>
/// The <see cref="string" /> containing the line.
/// </returns>
public byte[] ReadByteLine(out bool hitStreamEnd) {
hitStreamEnd = false;
if (!HasData()) {
// No streams, no data!
return null;
}
// This is horribly inefficient, consider profiling here if
// it becomes an issue.
BinaryReader top = streams.Peek();
byte[] ignore = CurrentEncoding.GetBytes(new[] { '\r' });
byte[] search = CurrentEncoding.GetBytes(new[] { '\n' });
int searchPos = 0;
var builder = new MemoryStream();
while (true) {
// First we need to read a byte from one of the streams
var bytes = new byte[search.Length];
int amountRead = top.Read(bytes, 0, bytes.Length);
while (amountRead == 0) {
streams.Pop();
if (!streams.Any()) {
hitStreamEnd = true;
return builder.ToArray();
}
top.Dispose();
top = streams.Peek();
amountRead = top.Read(bytes, 0, bytes.Length);
}
// Now we've got some bytes, we need to check it against the search array.
foreach (byte b in bytes) {
if (ignore.Contains(b)) {
continue;
}
if (b == search[searchPos]) {
searchPos += 1;
} else {
// We only want to append the information if it's
// not part of the newline sequence
if (searchPos != 0) {
byte[] append = search.Take(searchPos).ToArray();
builder.Write(append, 0, append.Length);
}
builder.Write(new[] { b }, 0, 1);
searchPos = 0;
}
// Finally if we've found our string
if (searchPos == search.Length) {
return builder.ToArray();
}
}
}
}
/// <summary>
/// Reads a line from the stack delimited by the newline for this platform. The newline
/// characters will not be included in the stream
/// </summary>
/// <returns>
/// The <see cref="string" /> containing the line.
/// </returns>
public string ReadLine() {
bool dummy;
return ReadLine(out dummy);
}
/// <summary>
/// Reads a line from the stack delimited by the newline for this platform. The newline
/// characters will not be included in the stream
/// </summary>
/// <param name="hitStreamEnd">
/// This will be set to true if we did not end on a newline but instead found the end of
/// our data.
/// </param>
/// <returns>
/// The <see cref="string" /> containing the line.
/// </returns>
public string ReadLine(out bool hitStreamEnd) {
bool foundEnd;
byte[] result = ReadByteLine(out foundEnd);
hitStreamEnd = foundEnd;
if (result == null) {
return null;
}
return CurrentEncoding.GetString(result);
}
#endregion
#region Methods
/// <summary>
/// Removes the current reader from the stack and ensures it is correctly
/// destroyed and then returns the next available reader. If no reader
/// is available this method returns null.
/// </summary>
/// <returns>
/// The next <see cref="BinaryReader">reader</see>.
/// </returns>
private BinaryReader NextStream() {
BinaryReader top = streams.Pop();
top.Dispose();
return streams.Any() ? streams.Peek() : null;
}
#endregion
}
}
// --------------------------------------------------------------------------------------------------------------------
// <copyright file="RebufferableBinaryReader.cs" company="Jake Woods">
// Copyright (c) 2013 Jake Woods
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this software
// and associated documentation files (the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute,
// sublicense, and/or sell copies of the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies
// or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
// INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
// ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// </copyright>
// <author>Jake Woods</author>
// <summary>
// Provides methods to interpret and read a stream as either character or binary
// data similar to a and provides the ability to push
// data onto the front of the stream.
// </summary>
// --------------------------------------------------------------------------------------------------------------------
using System;
using System.IO;
using System.Text;
namespace HttpMultipartParser {
/// <summary>
/// Provides methods to interpret and read a stream as either character or binary
/// data similar to a <see cref="BinaryReader" /> and provides the ability to push
/// data onto the front of the stream.
/// </summary>
public class RebufferableBinaryReader {
#region Fields
/// <summary>
/// The size of the buffer to use when reading new data.
/// </summary>
private readonly int bufferSize;
/// <summary>
/// The encoding to use for character based operations
/// </summary>
private readonly Encoding encoding;
/// <summary>
/// The stream to read raw data from.
/// </summary>
private readonly Stream stream;
/// <summary>
/// The stream stack to store buffered data.
/// </summary>
private readonly BinaryStreamStack streamStack;
public long? Length { private get; set; }
public long BytesRead { private get; set; }
#endregion
#region Constructors and Destructors
/// <summary>
/// Initializes a new instance of the <see cref="RebufferableBinaryReader" /> class.
/// Default encoding of UTF8 will be used.
/// </summary>
/// <param name="input">
/// The input stream to read from.
/// </param>
public RebufferableBinaryReader(Stream input)
: this(input, new UTF8Encoding(false)) {
}
/// <summary>
/// Initializes a new instance of the <see cref="RebufferableBinaryReader" /> class.
/// </summary>
/// <param name="input">
/// The input stream to read from.
/// </param>
/// <param name="encoding">
/// The encoding to use for character based operations.
/// </param>
public RebufferableBinaryReader(Stream input, Encoding encoding)
: this(input, encoding, 4096) {
}
/// <summary>
/// Initializes a new instance of the <see cref="RebufferableBinaryReader" /> class.
/// </summary>
/// <param name="input">
/// The input stream to read from.
/// </param>
/// <param name="encoding">
/// The encoding to use for character based operations.
/// </param>
/// <param name="bufferSize">
/// The buffer size to use for new buffers.
/// </param>
public RebufferableBinaryReader(Stream input, Encoding encoding, int bufferSize) {
stream = input;
streamStack = new BinaryStreamStack(encoding);
this.encoding = encoding;
this.bufferSize = bufferSize;
}
#endregion
#region Public Methods and Operators
/// <summary>
/// Adds data to the front of the stream. The most recently buffered data will
/// be read first.
/// </summary>
/// <param name="data">
/// The data to buffer.
/// </param>
public void Buffer(byte[] data) {
streamStack.Push(data);
}
/// <summary>
/// Adds the string to the front of the stream. The most recently buffered data will
/// be read first.
/// </summary>
/// <param name="data">
/// The data.
/// </param>
public void Buffer(string data) {
streamStack.Push(encoding.GetBytes(data));
}
/// <summary>
/// Reads a single byte as an integer from the stream. Returns -1 if no
/// data is left to read.
/// </summary>
/// <returns>
/// The <see cref="byte" /> that was read.
/// </returns>
public int Read() {
int value = -1;
while (value == -1) {
if (!streamStack.HasData()) {
if (StreamData() == 0) {
return -1;
}
}
value = streamStack.Read();
}
return value;
}
/// <summary>
/// Reads the specified number of bytes from the stream, starting from a
/// specified point in the byte array.
/// </summary>
/// <param name="buffer">
/// The buffer to read data into.
/// </param>
/// <param name="index">
/// The index of buffer to start reading into.
/// </param>
/// <param name="count">
/// The number of bytes to read into the buffer.
/// </param>
/// <returns>
/// The number of bytes read into buffer. This might be less than the number of bytes requested if that many bytes are not available,
/// or it might be zero if the end of the stream is reached.
/// </returns>
public int Read(byte[] buffer, int index, int count) {
int amountRead = 0;
while (amountRead < count) {
if (!streamStack.HasData()) {
if (StreamData() == 0) {
return amountRead;
}
}
amountRead += streamStack.Read(buffer, index + amountRead, count - amountRead);
}
return amountRead;
}
/// <summary>
/// Reads the specified number of characters from the stream, starting from a
/// specified point in the byte array.
/// </summary>
/// <param name="buffer">
/// The buffer to read data into.
/// </param>
/// <param name="index">
/// The index of buffer to start reading into.
/// </param>
/// <param name="count">
/// The number of characters to read into the buffer.
/// </param>
/// <returns>
/// The number of characters read into buffer. This might be less than the number of
/// characters requested if that many characters are not available,
/// or it might be zero if the end of the stream is reached.
/// </returns>
public int Read(char[] buffer, int index, int count) {
int amountRead = 0;
while (amountRead < count) {
if (!streamStack.HasData()) {
if (StreamData() == 0) {
return amountRead;
}
}
amountRead += streamStack.Read(buffer, index + amountRead, count - amountRead);
}
return amountRead;
}
/// <summary>
/// Reads a series of bytes delimited by the byte encoding of newline for this platform.
/// the newline bytes will not be included in the return data.
/// </summary>
/// <returns>
/// A byte array containing all the data up to but not including the next newline in the stack.
/// </returns>
public byte[] ReadByteLine() {
var builder = new MemoryStream();
while (true) {
if (!streamStack.HasData()) {
if (StreamData() == 0) {
return builder.Length > 0 ? builder.ToArray() : null;
}
}
bool hitStreamEnd;
byte[] line = streamStack.ReadByteLine(out hitStreamEnd);
builder.Write(line, 0, line.Length);
if (!hitStreamEnd) {
return builder.ToArray();
}
}
}
/// <summary>
/// Reads a line from the stack delimited by the newline for this platform. The newline
/// characters will not be included in the stream
/// </summary>
/// <returns>
/// The <see cref="string" /> containing the line or null if end of stream.
/// </returns>
public string ReadLine() {
byte[] data = ReadByteLine();
return data == null ? null : encoding.GetString(data);
}
#endregion
#region Methods
/// <summary>
/// Determines the byte order marking offset (if any) from the
/// given buffer.
/// </summary>
/// <param name="buffer">
/// The buffer to examine.
/// </param>
/// <returns>
/// The <see cref="int" /> representing the length of the byte order marking.
/// </returns>
private int GetBomOffset(byte[] buffer) {
byte[] bom = encoding.GetPreamble();
bool usesBom = true;
for (int i = 0; i < bom.Length; ++i) {
if (bom[i] != buffer[i]) {
usesBom = false;
}
}
return usesBom ? bom.Length : 0;
}
/// <summary>
/// Reads more data from the stream into the stream stack.
/// </summary>
/// <returns>
/// The number of bytes read into the stream stack as an <see cref="int" />
/// </returns>
private int StreamData() {
if (!Length.HasValue || BytesRead < Length) {
var buffer = new byte[bufferSize];
int amountRead = stream.Read(buffer, 0, buffer.Length);
BytesRead += amountRead;
// We need to check if our stream is using our encodings
// BOM, if it is we need to jump it.
int bomOffset = GetBomOffset(buffer);
// Sometimes we'll get a buffer that's smaller then we expect, chop it down
// for the reader:
if (amountRead - bomOffset > 0) {
if (amountRead != buffer.Length || bomOffset > 0) {
var smallBuffer = new byte[amountRead - bomOffset];
System.Buffer.BlockCopy(buffer, bomOffset, smallBuffer, 0, amountRead - bomOffset);
streamStack.Push(smallBuffer);
} else {
streamStack.Push(buffer);
}
}
return amountRead;
} else {
return 0;
}
}
#endregion
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using HttpMultipartParser;
namespace HttpServer {
public class RebufferableBinaryReaderStreamAdapter : Stream {
RebufferableBinaryReader inner;
long length;
long read;
public RebufferableBinaryReaderStreamAdapter(RebufferableBinaryReader reader, long length) {
this.inner = reader;
this.length = length;
this.read = 0;
}
public override bool CanRead {
get { return true; }
}
public override bool CanSeek {
get { return false; }
}
public override bool CanWrite {
get { return false; }
}
public override void Flush() {
}
public override long Length {
get { return length; }
}
public override long Position {
get {
throw new NotSupportedException();
}
set {
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count) {
if (read < length) {
int n = inner.Read(buffer, offset, count);
read += n;
return n;
} else {
return 0;
}
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotSupportedException();
}
public override void SetLength(long value) {
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count) {
throw new NotSupportedException();
}
}
}
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2012 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
namespace Nuclex.Support.IO {
/// <summary>Wraps a stream and exposes only a limited region of its data</summary>
public class PartialStream : Stream {
/// <summary>Initializes a new partial stream</summary>
/// <param name="stream">
/// Stream the wrapper will make a limited region accessible of
/// </param>
/// <param name="start">
/// Start index in the stream which becomes the beginning for the wrapper
/// </param>
/// <param name="length">
/// Length the wrapped stream should report and allow access to
/// </param>
public PartialStream(Stream stream, long start, long length) {
if(start < 0) {
throw new ArgumentException("Start index must not be less than 0", "start");
}
if(!stream.CanSeek) {
if(start != 0) {
throw new ArgumentException(
"The only valid start for unseekable streams is 0", "start"
);
}
} else {
if(start + length > stream.Length) {
throw new ArgumentException(
"Partial stream exceeds end of full stream", "length"
);
}
}
this.stream = stream;
this.start = start;
this.length = length;
}
/// <summary>Whether data can be read from the stream</summary>
public override bool CanRead {
get { return this.stream.CanRead; }
}
/// <summary>Whether the stream supports seeking</summary>
public override bool CanSeek {
get { return this.stream.CanSeek; }
}
/// <summary>Whether data can be written into the stream</summary>
public override bool CanWrite {
get { return this.stream.CanWrite; }
}
/// <summary>
/// Clears all buffers for this stream and causes any buffered data to be written
/// to the underlying device.
/// </summary>
public override void Flush() {
this.stream.Flush();
}
/// <summary>Length of the stream in bytes</summary>
/// <exception cref="NotSupportedException">
/// The wrapped stream does not support seeking
/// </exception>
public override long Length {
get { return this.length; }
}
/// <summary>Absolute position of the file pointer within the stream</summary>
/// <exception cref="NotSupportedException">
/// The wrapped stream does not support seeking
/// </exception>
public override long Position {
get {
if(!this.stream.CanSeek) {
throw makeSeekNotSupportedException("seek");
}
return this.position;
}
set { moveFilePointer(value); }
}
/// <summary>
/// Reads a sequence of bytes from the stream and advances the position of
/// the file pointer by the number of bytes read.
/// </summary>
/// <param name="buffer">Buffer that will receive the data read from the stream</param>
/// <param name="offset">
/// Offset in the buffer at which the stream will place the data read
/// </param>
/// <param name="count">Maximum number of bytes that will be read</param>
/// <returns>
/// The number of bytes that were actually read from the stream and written into
/// the provided buffer
/// </returns>
/// <exception cref="NotSupportedException">
/// The wrapped stream does not support reading
/// </exception>
public override int Read(byte[] buffer, int offset, int count) {
if(!this.stream.CanRead) {
throw new NotSupportedException(
"Can't read: the wrapped stream doesn't support reading"
);
}
long remaining = this.length - this.position;
int bytesToRead = (int)Math.Min(count, remaining);
if(this.stream.CanSeek) {
this.stream.Position = this.position + this.start;
}
int bytesRead = this.stream.Read(buffer, offset, bytesToRead);
this.position += bytesRead;
return bytesRead;
}
/// <summary>Changes the position of the file pointer</summary>
/// <param name="offset">
/// Offset to move the file pointer by, relative to the position indicated by
/// the <paramref name="origin" /> parameter.
/// </param>
/// <param name="origin">
/// Reference point relative to which the file pointer is placed
/// </param>
/// <returns>The new absolute position within the stream</returns>
public override long Seek(long offset, SeekOrigin origin) {
switch(origin) {
case SeekOrigin.Begin: {
return Position = offset;
}
case SeekOrigin.Current: {
return Position += offset;
}
case SeekOrigin.End: {
return Position = (Length + offset);
}
default: {
throw new ArgumentException("Invalid seek origin", "origin");
}
}
}
/// <summary>Changes the length of the stream</summary>
/// <param name="value">New length the stream shall have</param>
/// <exception cref="NotSupportedException">
/// Always, the stream chainer does not support the SetLength() operation
/// </exception>
public override void SetLength(long value) {
throw new NotSupportedException("Resizing partial streams is not supported");
}
/// <summary>
/// Writes a sequence of bytes to the stream and advances the position of
/// the file pointer by the number of bytes written.
/// </summary>
/// <param name="buffer">
/// Buffer containing the data that will be written to the stream
/// </param>
/// <param name="offset">
/// Offset in the buffer at which the data to be written starts
/// </param>
/// <param name="count">Number of bytes that will be written into the stream</param>
/// <remarks>
/// The behavior of this method is as follows: If one or more chained streams
/// do not support seeking, all data is appended to the final stream in the
/// chain. Otherwise, writing will begin with the stream the current file pointer
/// offset falls into. If the end of that stream is reached, writing continues
/// in the next stream. On the last stream, writing more data into the stream
/// that it current size allows will enlarge the stream.
/// </remarks>
public override void Write(byte[] buffer, int offset, int count) {
long remaining = this.length - this.position;
if(count > remaining) {
throw new NotSupportedException(
"Cannot extend the length of the partial stream"
);
}
if(this.stream.CanSeek) {
this.stream.Position = this.position + this.start;
}
this.stream.Write(buffer, offset, count);
this.position += count;
}
/// <summary>Stream being wrapped by the partial stream wrapper</summary>
public Stream CompleteStream {
get { return this.stream; }
}
/// <summary>Moves the file pointer</summary>
/// <param name="position">New position the file pointer will be moved to</param>
private void moveFilePointer(long position) {
if(!this.stream.CanSeek) {
throw makeSeekNotSupportedException("seek");
}
// Seemingly, it is okay to move the file pointer beyond the end of
// the stream until you try to Read() or Write()
this.position = position;
}
/// <summary>
/// Constructs a NotSupportException for an error caused by the wrapped
/// stream having no seek support
/// </summary>
/// <param name="action">Action that was tried to perform</param>
/// <returns>The newly constructed NotSupportedException</returns>
private static NotSupportedException makeSeekNotSupportedException(string action) {
return new NotSupportedException(
string.Format(
"Can't {0}: the wrapped stream does not support seeking",
action
)
);
}
/// <summary>Streams that have been chained together</summary>
private Stream stream;
/// <summary>Start index of the partial stream in the wrapped stream</summary>
private long start;
/// <summary>Zero-based position of the partial stream's file pointer</summary>
/// <remarks>
/// If the stream does not support seeking, the position will simply be counted
/// up until it reaches <see cref="PartialStream.length" />.
/// </remarks>
private long position;
/// <summary>Length of the partial stream</summary>
private long length;
}
} // namespace Nuclex.Support.IO
using System;
using System.Data;
using System.Data.Entity;
using System.Data.SqlClient;
using System.IO;
namespace FilePublicator2.Infrastructure
{
public class VarbinaryStream : Stream
{
private SqlConnection _Connection;
private string _TableName;
private string _BinaryColumn;
private string _KeyColumn;
private int _KeyValue;
private long _Offset;
private SqlDataReader _SQLReader;
private long _SQLReadPosition;
private bool _AllowedToRead = false;
public VarbinaryStream(
string ConnectionString,
string TableName,
string BinaryColumn,
string KeyColumn,
int KeyValue,
bool AllowRead = false)
{
// create own connection with the connection string.
_Connection = new SqlConnection(ConnectionString);
_TableName = TableName;
_BinaryColumn = BinaryColumn;
_KeyColumn = KeyColumn;
_KeyValue = KeyValue;
// only query the database for a result if we are going to be reading, otherwise skip.
_AllowedToRead = AllowRead;
if (_AllowedToRead == true)
{
try
{
if (_Connection.State != ConnectionState.Open)
_Connection.Open();
SqlCommand cmd = new SqlCommand(
@"SELECT TOP 1 [" + _BinaryColumn + @"]
FROM [dbo].[" + _TableName + @"]
WHERE [" + _KeyColumn + "] = @id",
_Connection);
cmd.Parameters.Add(new SqlParameter("@id", _KeyValue));
_SQLReader = cmd.ExecuteReader(
CommandBehavior.SequentialAccess |
CommandBehavior.SingleResult |
CommandBehavior.SingleRow |
CommandBehavior.CloseConnection);
_SQLReader.Read();
}
catch (Exception e)
{
// log errors here
}
}
}
// this method will be called as part of the Stream ímplementation when we try to write to our VarbinaryStream class.
public override void Write(byte[] buffer, int index, int count)
{
try
{
if (_Connection.State != ConnectionState.Open)
_Connection.Open();
if (_Offset == 0)
{
// for the first write we just send the bytes to the Column
SqlCommand cmd = new SqlCommand(
@"UPDATE [dbo].[" + _TableName + @"]
SET [" + _BinaryColumn + @"] = @firstchunk
WHERE [" + _KeyColumn + "] = @id",
_Connection);
cmd.Parameters.Add(new SqlParameter("@firstchunk", buffer));
cmd.Parameters.Add(new SqlParameter("@id", _KeyValue));
cmd.ExecuteNonQuery();
_Offset = count;
}
else
{
// for all updates after the first one we use the TSQL command .WRITE() to append the data in the database
SqlCommand cmd = new SqlCommand(
@"UPDATE [dbo].[" + _TableName + @"]
SET [" + _BinaryColumn + @"].WRITE(@chunk, NULL, @length)
WHERE [" + _KeyColumn + "] = @id",
_Connection);
cmd.Parameters.Add(new SqlParameter("@chunk", buffer));
cmd.Parameters.Add(new SqlParameter("@length", count));
cmd.Parameters.Add(new SqlParameter("@id", _KeyValue));
cmd.ExecuteNonQuery();
_Offset += count;
}
}
catch (Exception e)
{
// log errors here
}
}
// this method will be called as part of the Stream ímplementation when we try to read from our VarbinaryStream class.
public override int Read(byte[] buffer, int offset, int count)
{
try
{
long bytesRead = _SQLReader.GetBytes(0, _SQLReadPosition, buffer, offset, count);
_SQLReadPosition += bytesRead;
return (int)bytesRead;
}
catch (Exception e)
{
// log errors here
}
return -1;
}
public override bool CanRead
{
get { return _AllowedToRead; }
}
#region unimplemented methods
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
throw new NotImplementedException();
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
#endregion unimplemented methods
}
}
//---------------------------------------------------------------------
// File: VirtualStream.cs
//
// Summary: A sample pipeline component which demonstrates how to promote message context
// properties and write distinguished fields for XML messages using arbitrary
// XPath expressions.
//
// Sample: Arbitrary XPath Property Handler Pipeline Component SDK
//
//---------------------------------------------------------------------
// This file is part of the Microsoft BizTalk Server 2006 SDK
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// This source code is intended only as a supplement to Microsoft BizTalk
// Server 2006 release and/or on-line documentation. See these other
// materials for detailed information regarding Microsoft code samples.
//
// THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY
// KIND, WHETHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR
// PURPOSE.
//---------------------------------------------------------------------
using System;
using System.IO;
using System.Text;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;
namespace Microsoft.Samples.BizTalk.Adapter.Tcp
{
/// <summary>
/// Implements a virtual stream, i.e. the always seekable stream which
/// uses configurable amount of memory to reduce a memory footprint and
/// temporarily stores remaining data in a temporary file on disk.
/// </summary>
public sealed class VirtualStream : Stream, IDisposable
{
/// <summary>
/// Memory handling.
/// </summary>
public enum MemoryFlag
{
AutoOverFlowToDisk = 0,
OnlyInMemory = 1,
OnlyToDisk = 2
}
// Constants
private const int MemoryThreshold = 4*1024*1024; // The maximum possible memory consumption (4Mb)
private const int DefaultMemorySize = 4*1024; // Default memory consumption (4Kb)
private Stream wrappedStream;
private bool isDisposed;
private bool isInMemory;
private int thresholdSize;
private MemoryFlag memoryStatus;
/// <summary>
/// Initializes a VirtualStream instance with default parameters (10K memory buffer,
/// allow overflow to disk).
/// </summary>
public VirtualStream()
: this(DefaultMemorySize, MemoryFlag.AutoOverFlowToDisk, new MemoryStream())
{
}
/// <summary>
/// Initializes a VirtualStream instance with memory buffer size.
/// </summary>
/// <param name="bufferSize">Memory buffer size</param>
public VirtualStream(int bufferSize)
: this(bufferSize, MemoryFlag.AutoOverFlowToDisk, new MemoryStream(bufferSize))
{
}
/// <summary>
/// Initializes a VirtualStream instance with a default memory size and memory flag specified.
/// </summary>
/// <param name="flag">Memory flag</param>
public VirtualStream(MemoryFlag flag)
: this(DefaultMemorySize, flag,
(flag == MemoryFlag.OnlyToDisk) ? CreatePersistentStream() : new MemoryStream())
{
}
/// <summary>
/// Initializes a VirtualStream instance with a memory buffer size and memory flag specified.
/// </summary>
/// <param name="bufferSize">Memory buffer size</param>
/// <param name="flag">Memory flag</param>
public VirtualStream(int bufferSize, MemoryFlag flag)
: this(bufferSize, flag,
(flag == MemoryFlag.OnlyToDisk) ? CreatePersistentStream() : new MemoryStream(bufferSize))
{
}
/// <summary>
/// Initializes a VirtualStream instance with a memory buffer size, memory flag and underlying stream
/// specified.
/// </summary>
/// <param name="bufferSize">Memory buffer size</param>
/// <param name="flag">Memory flag</param>
/// <param name="dataStream">Underlying stream</param>
private VirtualStream(int bufferSize, MemoryFlag flag, Stream dataStream)
{
if (null == dataStream)
throw new ArgumentNullException("dataStream");
isInMemory = (flag != MemoryFlag.OnlyToDisk);
memoryStatus = flag;
bufferSize = Math.Min(bufferSize, MemoryThreshold);
thresholdSize = bufferSize;
if (isInMemory)
wrappedStream = dataStream; // Don't want to double wrap memory stream
else
wrappedStream = new BufferedStream(dataStream, bufferSize);
isDisposed = false;
}
#region Stream Methods and Properties
/// <summary>
/// Gets a flag indicating whether a stream can be read.
/// </summary>
override public bool CanRead
{
get {return wrappedStream.CanRead;}
}
/// <summary>
/// Gets a flag indicating whether a stream can be written.
/// </summary>
override public bool CanWrite
{
get {return wrappedStream.CanWrite;}
}
/// <summary>
/// Gets a flag indicating whether a stream can seek.
/// </summary>
override public bool CanSeek
{
get {return true;}
}
/// <summary>
/// Returns the length of the source stream.
/// <seealso cref="GetLength()"/>
/// </summary>
override public long Length
{
get {return wrappedStream.Length;}
}
/// <summary>
/// Gets or sets a position in the stream.
/// </summary>
override public long Position
{
get {return wrappedStream.Position;}
set {wrappedStream.Seek(value, SeekOrigin.Begin);}
}
/// <summary>
/// <see cref="Stream.Close()"/>
/// </summary>
/// <remarks>
/// Calling other methods after calling Close() may result in a ObjectDisposedException beeing throwed.
/// </remarks>
override public void Close()
{
if(!isDisposed)
{
GC.SuppressFinalize(this);
Cleanup();
}
}
/// <summary>
/// <see cref="Stream.Flush()"/>
/// </summary>
/// <remarks>
/// </remarks>
override public void Flush()
{
ThrowIfDisposed();
wrappedStream.Flush();
}
/// <summary>
/// <see cref="Stream.Read()"/>
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <returns>
/// The number of bytes read
/// </returns>
/// <remarks>
/// May throw <see cref="ObjectDisposedException"/>.
/// It will read from cached persistence stream
/// </remarks>
override public int Read(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
return wrappedStream.Read(buffer, offset, count);
}
/// <summary>
/// <see cref="Stream.Seek()"/>
/// </summary>
/// <param name="offset"></param>
/// <param name="origin"></param>
/// <returns>
/// The current position
/// </returns>
/// <remarks>
/// May throw <see cref="ObjectDisposedException"/>.
/// It will cache any new data into the persistence stream
/// </remarks>
override public long Seek(long offset, SeekOrigin origin)
{
ThrowIfDisposed();
return wrappedStream.Seek(offset, origin);
}
/// <summary>
/// <see cref="Stream.SetLength()"/>
/// </summary>
/// <param name="length"></param>
/// <remarks>
/// May throw <see cref="ObjectDisposedException"/>.
/// </remarks>
override public void SetLength(long length)
{
ThrowIfDisposed();
// Check if new position is greater than allowed by threshold
if (memoryStatus == MemoryFlag.AutoOverFlowToDisk &&
isInMemory &&
length > thresholdSize)
{
// Currently in memory, and the new write will push it over the limit
// Switching to Persist Stream
BufferedStream persistStream = new BufferedStream(CreatePersistentStream(), thresholdSize);
// Copy current wrapped memory stream to the persist stream
CopyStreamContent((MemoryStream)wrappedStream, persistStream);
// Close old wrapped stream
if (wrappedStream != null)
wrappedStream.Close();
wrappedStream = persistStream;
isInMemory = false;
}
// Set new length for the wrapped stream
wrappedStream.SetLength(length);
}
/// <summary>
/// <see cref="Stream.Write()"/>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// </summary>
/// <remarks>
/// Write to the underlying stream.
/// </remarks>
override public void Write(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
// Check if new position after write is greater than allowed by threshold
if (memoryStatus == MemoryFlag.AutoOverFlowToDisk &&
isInMemory &&
(count + wrappedStream.Position) > thresholdSize)
{
// Currently in memory, and the new write will push it over the limit
// Switching to Persist Stream
BufferedStream persistStream = new BufferedStream(CreatePersistentStream(), thresholdSize);
// Copy current wrapped memory stream to the persist stream
CopyStreamContent((MemoryStream) wrappedStream, persistStream);
// Close old wrapped stream
if (wrappedStream != null)
wrappedStream.Close();
wrappedStream = persistStream;
isInMemory = false;
}
wrappedStream.Write(buffer, offset, count);
}
#endregion
#region IDisposable Interface
/// <summary>
/// <see cref="IDisposeable.Dispose()"/>
/// </summary>
/// <remarks>
/// It will call <see cref="Close()"/>
/// </remarks>
public void Dispose()
{
Close();
}
#endregion
#region Private Utility Functions
/// <summary>
/// Utility method called by the Finalize(), Close() or Dispose() to close and release
/// both the source and the persistence stream.
/// </summary>
private void Cleanup()
{
if(!isDisposed)
{
isDisposed = true;
if(null != wrappedStream)
{
wrappedStream.Close();
wrappedStream = null;
}
}
}
/// <summary>
/// Copies source memory stream to the target stream.
/// </summary>
/// <param name="source">Source memory stream</param>
/// <param name="target">Target stream</param>
private void CopyStreamContent(MemoryStream source, Stream target)
{
// Remember position for the source stream
long currentPosition = source.Position;
// Read and write in chunks each thresholdSize
byte[] tempBuffer = new Byte[thresholdSize];
int read = 0;
source.Position = 0;
while ((read = source.Read(tempBuffer, 0, tempBuffer.Length)) != 0)
target.Write(tempBuffer, 0, read);
// Set target's stream position to be the same as was in source stream. This is required because
// target stream is going substitute source stream.
target.Position = currentPosition;
// Restore source stream's position (just in case to preserve the source stream's state)
source.Position = currentPosition;
}
/// <summary>
/// Called by other methods to check the stream state.
/// It will thorw <see cref="ObjectDisposedException"/> if the stream was closed or disposed.
/// </summary>
private void ThrowIfDisposed()
{
if(isDisposed || null == wrappedStream)
throw new ObjectDisposedException("VirtualStream");
}
/// <summary>
/// Utility method.
/// Creates a FileStream with a unique name and the temporary and delete-on-close attributes.
/// </summary>
/// <returns>
/// The temporary persistence stream
/// </returns>
public static Stream CreatePersistentStream()
{
StringBuilder name = new StringBuilder(256);
IntPtr handle;
if(0 == GetTempFileName(Path.GetTempPath(), "BTS", 0, name))
throw new IOException("GetTempFileName Failed.", Marshal.GetHRForLastWin32Error());
handle = CreateFile(name.ToString(), (UInt32) FileAccess.ReadWrite, 0, IntPtr.Zero, (UInt32) FileMode.Create, 0x04000100, IntPtr.Zero);
// FileStream constructor will throw exception if handle is zero or -1.
return new FileStream(new SafeFileHandle(handle, true), FileAccess.ReadWrite);
}
[DllImport("kernel32.dll")]
private static extern UInt32 GetTempFileName
(
string path,
string prefix,
UInt32 unique,
StringBuilder name
);
[DllImport("kernel32.dll")]
private static extern IntPtr CreateFile
(
string name,
UInt32 accessMode,
UInt32 shareMode,
IntPtr security,
UInt32 createMode,
UInt32 flags,
IntPtr template
);
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment