Last active
August 15, 2016 03:10
-
-
Save compustar/65d644654628057808e88b4198b05b4e to your computer and use it in GitHub Desktop.
Streaming Classes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #region CPL License | |
| /* | |
| Nuclex Framework | |
| Copyright (C) 2002-2012 Nuclex Development Labs | |
| This library is free software; you can redistribute it and/or | |
| modify it under the terms of the IBM Common Public License as | |
| published by the IBM Corporation; either version 1.0 of the | |
| License, or (at your option) any later version. | |
| This library is distributed in the hope that it will be useful, | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| IBM Common Public License for more details. | |
| You should have received a copy of the IBM Common Public | |
| License along with this library | |
| */ | |
| #endregion | |
| using System; | |
| using System.Collections.Generic; | |
| using System.Diagnostics; | |
| using System.IO; | |
| using System.Linq; | |
| namespace Nuclex.Support.IO { | |
| /// <summary>Chains a series of independent streams into a single stream</summary> | |
| /// <remarks> | |
| /// <para> | |
| /// This class can be used to chain multiple independent streams into a single | |
| /// stream that acts as if its chained streams were only one combined stream. | |
| /// It is useful to avoid creating huge memory streams or temporary files when | |
| /// you just need to prepend or append some data to a stream or if you need to | |
| /// read a file that was split into several parts as if it was a single file. | |
| /// </para> | |
| /// <para> | |
| /// It is not recommended to change the size of any chained stream after it | |
| /// has become part of a stream chainer, though the stream chainer will do its | |
| /// best to cope with the changes as they occur. Increasing the length of a | |
| /// chained stream is generally not an issue for streams that support seeking, | |
| /// but reducing the length might invalidate the stream chainer's file pointer, | |
| /// resulting in an IOException when Read() or Write() is next called. | |
| /// </para> | |
| /// </remarks> | |
| public class ChainStream : Stream { | |
| /// <summary>Initializes a new stream chainer</summary> | |
| /// <param name="streams">Array of streams that will be chained together</param> | |
| public ChainStream(params Stream[] streams) { | |
| List<Stream> s = new List<Stream>(); | |
| AddStreams(s, streams); | |
| this.streams = s.ToArray(); | |
| determineCapabilities(); | |
| } | |
| private void AddStreams(List<Stream> result, IEnumerable<Stream> from) { | |
| foreach (var stream in from) { | |
| if (stream is ChainStream) { | |
| AddStreams(result, (stream as ChainStream).streams); | |
| } else { | |
| result.Add(stream); | |
| } | |
| } | |
| } | |
| /// <summary>Whether data can be read from the stream</summary> | |
| public override bool CanRead { | |
| get { return this.allStreamsCanRead; } | |
| } | |
| /// <summary>Whether the stream supports seeking</summary> | |
| public override bool CanSeek { | |
| get { return this.allStreamsCanSeek; } | |
| } | |
| /// <summary>Whether data can be written into the stream</summary> | |
| public override bool CanWrite { | |
| get { return this.allStreamsCanWrite; } | |
| } | |
| /// <summary> | |
| /// Clears all buffers for this stream and causes any buffered data to be written | |
| /// to the underlying device. | |
| /// </summary> | |
| public override void Flush() { | |
| for (int index = 0; index < this.streams.Length; ++index) { | |
| this.streams[index].Flush(); | |
| } | |
| } | |
| /// <summary>Length of the stream in bytes</summary> | |
| /// <exception cref="NotSupportedException"> | |
| /// At least one of the chained streams does not support seeking | |
| /// </exception> | |
| public override long Length { | |
| get { | |
| if (!this.allStreamsCanSeek) { | |
| throw makeSeekNotSupportedException("determine length"); | |
| } | |
| // Sum up the length of all chained streams | |
| long length = 0; | |
| for (int index = 0; index < this.streams.Length; ++index) { | |
| length += this.streams[index].Length; | |
| } | |
| return length; | |
| } | |
| } | |
| /// <summary>Absolute position of the file pointer within the stream</summary> | |
| /// <exception cref="NotSupportedException"> | |
| /// At least one of the chained streams does not support seeking | |
| /// </exception> | |
| public override long Position { | |
| get { | |
| if (!this.allStreamsCanSeek) { | |
| throw makeSeekNotSupportedException("seek"); | |
| } | |
| return this.position; | |
| } | |
| set { moveFilePointer(value); } | |
| } | |
| /// <summary> | |
| /// Reads a sequence of bytes from the stream and advances the position of | |
| /// the file pointer by the number of bytes read. | |
| /// </summary> | |
| /// <param name="buffer">Buffer that will receive the data read from the stream</param> | |
| /// <param name="offset"> | |
| /// Offset in the buffer at which the stream will place the data read | |
| /// </param> | |
| /// <param name="count">Maximum number of bytes that will be read</param> | |
| /// <returns> | |
| /// The number of bytes that were actually read from the stream and written into | |
| /// the provided buffer | |
| /// </returns> | |
| /// <exception cref="NotSupportedException"> | |
| /// The chained stream at the current position does not support reading | |
| /// </exception> | |
| public override int Read(byte[] buffer, int offset, int count) { | |
| if (!this.allStreamsCanRead) { | |
| throw new NotSupportedException( | |
| "Can't read: at least one of the chained streams doesn't support reading" | |
| ); | |
| } | |
| int totalBytesRead = 0; | |
| int lastStreamIndex = this.streams.Length - 1; | |
| if (this.CanSeek) { | |
| // Find out from which stream and at which position we need to begin reading | |
| int streamIndex; | |
| long streamOffset; | |
| findStreamIndexAndOffset(this.position, out streamIndex, out streamOffset); | |
| // Try to read from the stream our current file pointer falls into. If more | |
| // data was requested than the stream contains, read each stream to its end | |
| // until we either have enough data or run out of streams. | |
| while (count > 0) { | |
| Stream currentStream = this.streams[streamIndex]; | |
| // Read up to count bytes from the current stream. Count is decreased each | |
| // time we successfully get data and holds the number of bytes remaining | |
| // to be read | |
| long maximumBytes = Math.Min(count, currentStream.Length - streamOffset); | |
| currentStream.Position = streamOffset; | |
| int bytesRead = currentStream.Read(buffer, offset, (int)maximumBytes); | |
| // Accumulate the total number of bytes we read for the return value | |
| totalBytesRead += bytesRead; | |
| // If the stream returned partial data, stop here. Also, if this was the | |
| // last stream we queried, this is as far as we can go. | |
| if ((bytesRead < maximumBytes) || (streamIndex == lastStreamIndex)) { | |
| break; | |
| } | |
| // Move on to the next stream in the chain | |
| ++streamIndex; | |
| streamOffset = 0; | |
| count -= bytesRead; | |
| offset += bytesRead; | |
| } | |
| this.position += totalBytesRead; | |
| } else { | |
| // Try to read from the active read stream. If the end of the active read | |
| // stream is reached, switch to the next stream in the chain until we have | |
| // no more streams left to read from | |
| while (this.activeReadStreamIndex <= lastStreamIndex) { | |
| // Try to read from the stream. The stream can either return any amount | |
| // of data > 0 if there's still data left ot be read or 0 if the end of | |
| // the stream was reached | |
| Stream activeStream = this.streams[this.activeReadStreamIndex]; | |
| if (activeStream.CanSeek) { | |
| activeStream.Position = this.activeReadStreamPosition; | |
| } | |
| totalBytesRead = activeStream.Read(buffer, offset, count); | |
| // If we got any data, we're done, exit the loop | |
| if (totalBytesRead != 0) { | |
| break; | |
| } else { // Otherwise, go to the next stream in the chain | |
| this.activeReadStreamPosition = 0; | |
| ++this.activeReadStreamIndex; | |
| } | |
| } | |
| this.activeReadStreamPosition += totalBytesRead; | |
| } | |
| return totalBytesRead; | |
| } | |
| /// <summary>Changes the position of the file pointer</summary> | |
| /// <param name="offset"> | |
| /// Offset to move the file pointer by, relative to the position indicated by | |
| /// the <paramref name="origin" /> parameter. | |
| /// </param> | |
| /// <param name="origin"> | |
| /// Reference point relative to which the file pointer is placed | |
| /// </param> | |
| /// <returns>The new absolute position within the stream</returns> | |
| public override long Seek(long offset, SeekOrigin origin) { | |
| switch (origin) { | |
| case SeekOrigin.Begin: { | |
| return Position = offset; | |
| } | |
| case SeekOrigin.Current: { | |
| return Position += offset; | |
| } | |
| case SeekOrigin.End: { | |
| return Position = (Length + offset); | |
| } | |
| default: { | |
| throw new ArgumentException("Invalid seek origin", "origin"); | |
| } | |
| } | |
| } | |
| /// <summary>Changes the length of the stream</summary> | |
| /// <param name="value">New length the stream shall have</param> | |
| /// <exception cref="NotSupportedException"> | |
| /// Always, the stream chainer does not support the SetLength() operation | |
| /// </exception> | |
| public override void SetLength(long value) { | |
| throw new NotSupportedException("Resizing chained streams is not supported"); | |
| } | |
| /// <summary> | |
| /// Writes a sequence of bytes to the stream and advances the position of | |
| /// the file pointer by the number of bytes written. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// Buffer containing the data that will be written to the stream | |
| /// </param> | |
| /// <param name="offset"> | |
| /// Offset in the buffer at which the data to be written starts | |
| /// </param> | |
| /// <param name="count">Number of bytes that will be written into the stream</param> | |
| /// <remarks> | |
| /// The behavior of this method is as follows: If one or more chained streams | |
| /// do not support seeking, all data is appended to the final stream in the | |
| /// chain. Otherwise, writing will begin with the stream the current file pointer | |
| /// offset falls into. If the end of that stream is reached, writing continues | |
| /// in the next stream. On the last stream, writing more data into the stream | |
| /// that it current size allows will enlarge the stream. | |
| /// </remarks> | |
| public override void Write(byte[] buffer, int offset, int count) { | |
| if (!this.allStreamsCanWrite) { | |
| throw new NotSupportedException( | |
| "Can't write: at least one of the chained streams doesn't support writing" | |
| ); | |
| } | |
| int remaining = count; | |
| // If seeking is supported, we can write into the mid of the stream, | |
| // if the user so desires | |
| if (this.allStreamsCanSeek) { | |
| // Find out in which stream and at which position we need to begin writing | |
| int streamIndex; | |
| long streamOffset; | |
| findStreamIndexAndOffset(this.position, out streamIndex, out streamOffset); | |
| // Write data into the streams, switching over to the next stream if data is | |
| // too large to fit into the current stream, until all data is spent. | |
| int lastStreamIndex = this.streams.Length - 1; | |
| while (remaining > 0) { | |
| Stream currentStream = this.streams[streamIndex]; | |
| // If this is the last stream, just write. If the data is larger than the last | |
| // stream's remaining bytes, it will append to that stream, enlarging it. | |
| if (streamIndex == lastStreamIndex) { | |
| // Write all remaining data into the last stream | |
| currentStream.Position = streamOffset; | |
| currentStream.Write(buffer, offset, remaining); | |
| remaining = 0; | |
| } else { // We're writing into a stream that's followed by another stream | |
| // Find out how much data we can put into the current stream without | |
| // enlarging it (if seeking is supported, so is the Length property) | |
| long currentStreamRemaining = currentStream.Length - streamOffset; | |
| int bytesToWrite = (int)Math.Min((long)remaining, currentStreamRemaining); | |
| // Write all data that can fit into the current stream | |
| currentStream.Position = streamOffset; | |
| currentStream.Write(buffer, offset, bytesToWrite); | |
| // Adjust the offsets and count for the next stream | |
| offset += bytesToWrite; | |
| remaining -= bytesToWrite; | |
| streamOffset = 0; | |
| ++streamIndex; | |
| } | |
| } | |
| } else { // Seeking not supported, append everything to the last stream | |
| Stream lastStream = this.streams[this.streams.Length - 1]; | |
| if (lastStream.CanSeek) { | |
| lastStream.Seek(0, SeekOrigin.End); | |
| } | |
| lastStream.Write(buffer, offset, remaining); | |
| } | |
| this.position += count; | |
| } | |
| /// <summary>Streams being combined by the stream chainer</summary> | |
| public Stream[] ChainedStreams { | |
| get { return this.streams; } | |
| } | |
| /// <summary>Moves the file pointer</summary> | |
| /// <param name="position">New position the file pointer will be moved to</param> | |
| private void moveFilePointer(long position) { | |
| if (!this.allStreamsCanSeek) { | |
| throw makeSeekNotSupportedException("seek"); | |
| } | |
| // Seemingly, it is okay to move the file pointer beyond the end of | |
| // the stream until you try to Read() or Write() | |
| this.position = position; | |
| } | |
| /// <summary> | |
| /// Finds the stream index and local offset for an absolute position within | |
| /// the combined streams. | |
| /// </summary> | |
| /// <param name="overallPosition">Absolute position within the combined streams</param> | |
| /// <param name="streamIndex"> | |
| /// Index of the stream the overall position falls into | |
| /// </param> | |
| /// <param name="streamPosition"> | |
| /// Local position within the stream indicated by <paramref name="streamIndex" /> | |
| /// </param> | |
| private void findStreamIndexAndOffset( | |
| long overallPosition, out int streamIndex, out long streamPosition | |
| ) { | |
| Debug.Assert( | |
| this.allStreamsCanSeek, "Call to findStreamIndexAndOffset() but no seek support" | |
| ); | |
| // In case the position is beyond the stream's end, this is what we will | |
| // return to the caller | |
| streamIndex = (this.streams.Length - 1); | |
| // Search until we have found the stream the position must lie in | |
| for (int index = 0; index < this.streams.Length; ++index) { | |
| long streamLength = this.streams[index].Length; | |
| if (overallPosition < streamLength) { | |
| streamIndex = index; | |
| break; | |
| } | |
| overallPosition -= streamLength; | |
| } | |
| // The overall position will have been decreased by each skipped stream's length, | |
| // so it should now contain the local position for the final stream we checked. | |
| streamPosition = overallPosition; | |
| } | |
| /// <summary>Determines the capabilities of the chained streams</summary> | |
| /// <remarks> | |
| /// <para> | |
| /// Theoretically, it would be possible to create a stream chainer that supported | |
| /// writing only when the file pointer was on a chained stream with write support, | |
| /// that could seek within the beginning of the stream until the first chained | |
| /// stream with no seek capability was encountered and so on. | |
| /// </para> | |
| /// <para> | |
| /// However, the interface of the Stream class requires us to make a definitive | |
| /// statement as to whether the Stream supports seeking, reading and writing. | |
| /// We can't return "maybe" or "mostly" in CanSeek, so the only sane choice that | |
| /// doesn't violate the Stream interface is to implement these capabilities as | |
| /// all or nothing - either all streams support a feature, or the stream chainer | |
| /// will report the feature as unsupported. | |
| /// </para> | |
| /// </remarks> | |
| private void determineCapabilities() { | |
| this.allStreamsCanSeek = true; | |
| this.allStreamsCanRead = true; | |
| this.allStreamsCanWrite = true; | |
| for (int index = 0; index < this.streams.Length; ++index) { | |
| this.allStreamsCanSeek &= this.streams[index].CanSeek; | |
| this.allStreamsCanRead &= this.streams[index].CanRead; | |
| this.allStreamsCanWrite &= this.streams[index].CanWrite; | |
| } | |
| } | |
| /// <summary> | |
| /// Constructs a NotSupportException for an error caused by one of the chained | |
| /// streams having no seek support | |
| /// </summary> | |
| /// <param name="action">Action that was tried to perform</param> | |
| /// <returns>The newly constructed NotSupportedException</returns> | |
| private static NotSupportedException makeSeekNotSupportedException(string action) { | |
| return new NotSupportedException( | |
| string.Format( | |
| "Can't {0}: at least one of the chained streams does not support seeking", | |
| action | |
| ) | |
| ); | |
| } | |
| /// <summary>Streams that have been chained together</summary> | |
| private Stream[] streams; | |
| /// <summary>Current position of the overall file pointer</summary> | |
| private long position; | |
| /// <summary>Stream we're currently reading from if seeking is not supported</summary> | |
| /// <remarks> | |
| /// If seeking is not supported, the stream chainer will read from each stream | |
| /// until the end was reached | |
| /// sequentially | |
| /// </remarks> | |
| private int activeReadStreamIndex; | |
| /// <summary>Position in the current read stream if seeking is not supported</summary> | |
| /// <remarks> | |
| /// If there is a mix of streams supporting seeking and not supporting seeking, we | |
| /// need to keep track of the read index for those streams that do. If, for example, | |
| /// the last stream is written to and read from in succession, the file pointer | |
| /// of that stream would have been moved to the end by the write attempt, skipping | |
| /// data that should have been read in the following read attempt. | |
| /// </remarks> | |
| private long activeReadStreamPosition; | |
| /// <summary>Whether all of the chained streams support seeking</summary> | |
| private bool allStreamsCanSeek; | |
| /// <summary>Whether all of the chained streams support reading</summary> | |
| private bool allStreamsCanRead; | |
| /// <summary>Whether all of the chained streams support writing</summary> | |
| private bool allStreamsCanWrite; | |
| } | |
| } // namespace Nuclex.Support.IO |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // -------------------------------------------------------------------------------------------------------------------- | |
| // <copyright file="BinaryStreamStack.cs" company="Jake Woods"> | |
| // Copyright (c) 2013 Jake Woods | |
| // | |
| // Permission is hereby granted, free of charge, to any person obtaining a copy of this software | |
| // and associated documentation files (the "Software"), to deal in the Software without restriction, | |
| // including without limitation the rights to use, copy, modify, merge, publish, distribute, | |
| // sublicense, and/or sell copies of the Software, and to permit persons to whom the Software | |
| // is furnished to do so, subject to the following conditions: | |
| // | |
| // The above copyright notice and this permission notice shall be included in all copies | |
| // or substantial portions of the Software. | |
| // | |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | |
| // INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR | |
| // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR | |
| // ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
| // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
| // </copyright> | |
| // <author>Jake Woods</author> | |
| // <summary> | |
| // Provides character based and byte based stream-like read operations over multiple | |
| // streams and provides methods to add data to the front of the buffer. | |
| // </summary> | |
| // -------------------------------------------------------------------------------------------------------------------- | |
| using System.Collections.Generic; | |
| using System.IO; | |
| using System.Linq; | |
| using System.Text; | |
| namespace HttpMultipartParser { | |
| /// <summary> | |
| /// Provides character based and byte based stream-like read operations over multiple | |
| /// streams and provides methods to add data to the front of the buffer. | |
| /// </summary> | |
| internal class BinaryStreamStack { | |
| #region Fields | |
| /// <summary> | |
| /// Holds the streams to read from, the stream on the top of the | |
| /// stack will be read first. | |
| /// </summary> | |
| private readonly Stack<BinaryReader> streams = new Stack<BinaryReader>(); | |
| #endregion | |
| #region Constructors and Destructors | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="BinaryStreamStack" /> class with the default | |
| /// encoding of UTF8. | |
| /// </summary> | |
| public BinaryStreamStack() | |
| : this(Encoding.UTF8) { | |
| } | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="BinaryStreamStack" /> class. | |
| /// </summary> | |
| /// <param name="encoding"> | |
| /// The encoding to use for character based operations. | |
| /// </param> | |
| public BinaryStreamStack(Encoding encoding) { | |
| CurrentEncoding = encoding; | |
| } | |
| #endregion | |
| #region Public Properties | |
| /// <summary> | |
| /// Gets or sets the current encoding. | |
| /// </summary> | |
| public Encoding CurrentEncoding { get; set; } | |
| #endregion | |
| #region Public Methods and Operators | |
| /// <summary> | |
| /// Returns true if there is any data left to read. | |
| /// </summary> | |
| /// <returns> | |
| /// True or false. | |
| /// </returns> | |
| public bool HasData() { | |
| return streams.Any(); | |
| } | |
| /// <summary> | |
| /// Returns the reader on the top of the stack but does not remove it. | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="BinaryReader" />. | |
| /// </returns> | |
| public BinaryReader Peek() { | |
| return streams.Peek(); | |
| } | |
| /// <summary> | |
| /// Returns the reader on the top of the stack and removes it | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="BinaryReader" />. | |
| /// </returns> | |
| public BinaryReader Pop() { | |
| return streams.Pop(); | |
| } | |
| /// <summary> | |
| /// Pushes data to the front of the stack. The most recently pushed data will | |
| /// be read first. | |
| /// </summary> | |
| /// <param name="data"> | |
| /// The data to add to the stack. | |
| /// </param> | |
| public void Push(byte[] data) { | |
| streams.Push(new BinaryReader(new MemoryStream(data), CurrentEncoding)); | |
| } | |
| /// <summary> | |
| /// Reads a single byte as an integer from the stack. Returns -1 if no | |
| /// data is left to read. | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="byte" /> that was read. | |
| /// </returns> | |
| public int Read() { | |
| BinaryReader top = streams.Peek(); | |
| int value; | |
| while ((value = top.Read()) == -1) { | |
| top.Dispose(); | |
| streams.Pop(); | |
| if (!streams.Any()) { | |
| return -1; | |
| } | |
| top = streams.Peek(); | |
| } | |
| return value; | |
| } | |
| /// <summary> | |
| /// Reads the specified number of bytes from the stack, starting from a specified point in the byte array. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// The buffer to read data into. | |
| /// </param> | |
| /// <param name="index"> | |
| /// The index of buffer to start reading into. | |
| /// </param> | |
| /// <param name="count"> | |
| /// The number of bytes to read into the buffer. | |
| /// </param> | |
| /// <returns> | |
| /// The number of bytes read into buffer. This might be less than the number of bytes requested if that many bytes are not available, | |
| /// or it might be zero if the end of the stream is reached. | |
| /// </returns> | |
| public int Read(byte[] buffer, int index, int count) { | |
| if (!HasData()) { | |
| return 0; | |
| } | |
| // Read through all the stream untill we exhaust them | |
| // or untill count is satisfied | |
| int amountRead = 0; | |
| BinaryReader top = streams.Peek(); | |
| while (amountRead < count && streams.Any()) { | |
| int read = top.Read(buffer, index + amountRead, count - amountRead); | |
| if (read == 0) { | |
| if ((top = NextStream()) == null) { | |
| return amountRead; | |
| } | |
| } else { | |
| amountRead += read; | |
| } | |
| } | |
| return amountRead; | |
| } | |
| /// <summary> | |
| /// Reads the specified number of characters from the stack, starting from a specified point in the byte array. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// The buffer to read data into. | |
| /// </param> | |
| /// <param name="index"> | |
| /// The index of buffer to start reading into. | |
| /// </param> | |
| /// <param name="count"> | |
| /// The number of characters to read into the buffer. | |
| /// </param> | |
| /// <returns> | |
| /// The number of characters read into buffer. This might be less than the number of bytes requested if that many bytes are not available, | |
| /// or it might be zero if the end of the stream is reached. | |
| /// </returns> | |
| public int Read(char[] buffer, int index, int count) { | |
| if (!HasData()) { | |
| return 0; | |
| } | |
| // Read through all the stream untill we exhaust them | |
| // or untill count is satisfied | |
| int amountRead = 0; | |
| BinaryReader top = streams.Peek(); | |
| while (amountRead < count && streams.Any()) { | |
| int read = top.Read(buffer, index + amountRead, count - amountRead); | |
| if (read == 0) { | |
| if ((top = NextStream()) == null) { | |
| return amountRead; | |
| } | |
| } else { | |
| amountRead += read; | |
| } | |
| } | |
| return amountRead; | |
| } | |
| /// <summary> | |
| /// Reads the specified number of characters from the stack, starting from a specified point in the byte array. | |
| /// </summary> | |
| /// <returns> | |
| /// A byte array containing all the data up to but not including the next newline in the stack. | |
| /// </returns> | |
| public byte[] ReadByteLine() { | |
| bool dummy; | |
| return ReadByteLine(out dummy); | |
| } | |
| /// <summary> | |
| /// Reads a line from the stack delimited by the newline for this platform. The newline | |
| /// characters will not be included in the stream | |
| /// </summary> | |
| /// <param name="hitStreamEnd"> | |
| /// This will be set to true if we did not end on a newline but instead found the end of | |
| /// our data. | |
| /// </param> | |
| /// <returns> | |
| /// The <see cref="string" /> containing the line. | |
| /// </returns> | |
| public byte[] ReadByteLine(out bool hitStreamEnd) { | |
| hitStreamEnd = false; | |
| if (!HasData()) { | |
| // No streams, no data! | |
| return null; | |
| } | |
| // This is horribly inefficient, consider profiling here if | |
| // it becomes an issue. | |
| BinaryReader top = streams.Peek(); | |
| byte[] ignore = CurrentEncoding.GetBytes(new[] { '\r' }); | |
| byte[] search = CurrentEncoding.GetBytes(new[] { '\n' }); | |
| int searchPos = 0; | |
| var builder = new MemoryStream(); | |
| while (true) { | |
| // First we need to read a byte from one of the streams | |
| var bytes = new byte[search.Length]; | |
| int amountRead = top.Read(bytes, 0, bytes.Length); | |
| while (amountRead == 0) { | |
| streams.Pop(); | |
| if (!streams.Any()) { | |
| hitStreamEnd = true; | |
| return builder.ToArray(); | |
| } | |
| top.Dispose(); | |
| top = streams.Peek(); | |
| amountRead = top.Read(bytes, 0, bytes.Length); | |
| } | |
| // Now we've got some bytes, we need to check it against the search array. | |
| foreach (byte b in bytes) { | |
| if (ignore.Contains(b)) { | |
| continue; | |
| } | |
| if (b == search[searchPos]) { | |
| searchPos += 1; | |
| } else { | |
| // We only want to append the information if it's | |
| // not part of the newline sequence | |
| if (searchPos != 0) { | |
| byte[] append = search.Take(searchPos).ToArray(); | |
| builder.Write(append, 0, append.Length); | |
| } | |
| builder.Write(new[] { b }, 0, 1); | |
| searchPos = 0; | |
| } | |
| // Finally if we've found our string | |
| if (searchPos == search.Length) { | |
| return builder.ToArray(); | |
| } | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Reads a line from the stack delimited by the newline for this platform. The newline | |
| /// characters will not be included in the stream | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="string" /> containing the line. | |
| /// </returns> | |
| public string ReadLine() { | |
| bool dummy; | |
| return ReadLine(out dummy); | |
| } | |
| /// <summary> | |
| /// Reads a line from the stack delimited by the newline for this platform. The newline | |
| /// characters will not be included in the stream | |
| /// </summary> | |
| /// <param name="hitStreamEnd"> | |
| /// This will be set to true if we did not end on a newline but instead found the end of | |
| /// our data. | |
| /// </param> | |
| /// <returns> | |
| /// The <see cref="string" /> containing the line. | |
| /// </returns> | |
| public string ReadLine(out bool hitStreamEnd) { | |
| bool foundEnd; | |
| byte[] result = ReadByteLine(out foundEnd); | |
| hitStreamEnd = foundEnd; | |
| if (result == null) { | |
| return null; | |
| } | |
| return CurrentEncoding.GetString(result); | |
| } | |
| #endregion | |
| #region Methods | |
| /// <summary> | |
| /// Removes the current reader from the stack and ensures it is correctly | |
| /// destroyed and then returns the next available reader. If no reader | |
| /// is available this method returns null. | |
| /// </summary> | |
| /// <returns> | |
| /// The next <see cref="BinaryReader">reader</see>. | |
| /// </returns> | |
| private BinaryReader NextStream() { | |
| BinaryReader top = streams.Pop(); | |
| top.Dispose(); | |
| return streams.Any() ? streams.Peek() : null; | |
| } | |
| #endregion | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // -------------------------------------------------------------------------------------------------------------------- | |
| // <copyright file="RebufferableBinaryReader.cs" company="Jake Woods"> | |
| // Copyright (c) 2013 Jake Woods | |
| // | |
| // Permission is hereby granted, free of charge, to any person obtaining a copy of this software | |
| // and associated documentation files (the "Software"), to deal in the Software without restriction, | |
| // including without limitation the rights to use, copy, modify, merge, publish, distribute, | |
| // sublicense, and/or sell copies of the Software, and to permit persons to whom the Software | |
| // is furnished to do so, subject to the following conditions: | |
| // | |
| // The above copyright notice and this permission notice shall be included in all copies | |
| // or substantial portions of the Software. | |
| // | |
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, | |
| // INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR | |
| // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR | |
| // ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
| // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | |
| // </copyright> | |
| // <author>Jake Woods</author> | |
| // <summary> | |
| // Provides methods to interpret and read a stream as either character or binary | |
| // data similar to a and provides the ability to push | |
| // data onto the front of the stream. | |
| // </summary> | |
| // -------------------------------------------------------------------------------------------------------------------- | |
| using System; | |
| using System.IO; | |
| using System.Text; | |
| namespace HttpMultipartParser { | |
| /// <summary> | |
| /// Provides methods to interpret and read a stream as either character or binary | |
| /// data similar to a <see cref="BinaryReader" /> and provides the ability to push | |
| /// data onto the front of the stream. | |
| /// </summary> | |
| public class RebufferableBinaryReader { | |
| #region Fields | |
| /// <summary> | |
| /// The size of the buffer to use when reading new data. | |
| /// </summary> | |
| private readonly int bufferSize; | |
| /// <summary> | |
| /// The encoding to use for character based operations | |
| /// </summary> | |
| private readonly Encoding encoding; | |
| /// <summary> | |
| /// The stream to read raw data from. | |
| /// </summary> | |
| private readonly Stream stream; | |
| /// <summary> | |
| /// The stream stack to store buffered data. | |
| /// </summary> | |
| private readonly BinaryStreamStack streamStack; | |
| public long? Length { private get; set; } | |
| public long BytesRead { private get; set; } | |
| #endregion | |
| #region Constructors and Destructors | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="RebufferableBinaryReader" /> class. | |
| /// Default encoding of UTF8 will be used. | |
| /// </summary> | |
| /// <param name="input"> | |
| /// The input stream to read from. | |
| /// </param> | |
| public RebufferableBinaryReader(Stream input) | |
| : this(input, new UTF8Encoding(false)) { | |
| } | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="RebufferableBinaryReader" /> class. | |
| /// </summary> | |
| /// <param name="input"> | |
| /// The input stream to read from. | |
| /// </param> | |
| /// <param name="encoding"> | |
| /// The encoding to use for character based operations. | |
| /// </param> | |
| public RebufferableBinaryReader(Stream input, Encoding encoding) | |
| : this(input, encoding, 4096) { | |
| } | |
| /// <summary> | |
| /// Initializes a new instance of the <see cref="RebufferableBinaryReader" /> class. | |
| /// </summary> | |
| /// <param name="input"> | |
| /// The input stream to read from. | |
| /// </param> | |
| /// <param name="encoding"> | |
| /// The encoding to use for character based operations. | |
| /// </param> | |
| /// <param name="bufferSize"> | |
| /// The buffer size to use for new buffers. | |
| /// </param> | |
| public RebufferableBinaryReader(Stream input, Encoding encoding, int bufferSize) { | |
| stream = input; | |
| streamStack = new BinaryStreamStack(encoding); | |
| this.encoding = encoding; | |
| this.bufferSize = bufferSize; | |
| } | |
| #endregion | |
| #region Public Methods and Operators | |
| /// <summary> | |
| /// Adds data to the front of the stream. The most recently buffered data will | |
| /// be read first. | |
| /// </summary> | |
| /// <param name="data"> | |
| /// The data to buffer. | |
| /// </param> | |
| public void Buffer(byte[] data) { | |
| streamStack.Push(data); | |
| } | |
| /// <summary> | |
| /// Adds the string to the front of the stream. The most recently buffered data will | |
| /// be read first. | |
| /// </summary> | |
| /// <param name="data"> | |
| /// The data. | |
| /// </param> | |
| public void Buffer(string data) { | |
| streamStack.Push(encoding.GetBytes(data)); | |
| } | |
| /// <summary> | |
| /// Reads a single byte as an integer from the stream. Returns -1 if no | |
| /// data is left to read. | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="byte" /> that was read. | |
| /// </returns> | |
| public int Read() { | |
| int value = -1; | |
| while (value == -1) { | |
| if (!streamStack.HasData()) { | |
| if (StreamData() == 0) { | |
| return -1; | |
| } | |
| } | |
| value = streamStack.Read(); | |
| } | |
| return value; | |
| } | |
| /// <summary> | |
| /// Reads the specified number of bytes from the stream, starting from a | |
| /// specified point in the byte array. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// The buffer to read data into. | |
| /// </param> | |
| /// <param name="index"> | |
| /// The index of buffer to start reading into. | |
| /// </param> | |
| /// <param name="count"> | |
| /// The number of bytes to read into the buffer. | |
| /// </param> | |
| /// <returns> | |
| /// The number of bytes read into buffer. This might be less than the number of bytes requested if that many bytes are not available, | |
| /// or it might be zero if the end of the stream is reached. | |
| /// </returns> | |
| public int Read(byte[] buffer, int index, int count) { | |
| int amountRead = 0; | |
| while (amountRead < count) { | |
| if (!streamStack.HasData()) { | |
| if (StreamData() == 0) { | |
| return amountRead; | |
| } | |
| } | |
| amountRead += streamStack.Read(buffer, index + amountRead, count - amountRead); | |
| } | |
| return amountRead; | |
| } | |
| /// <summary> | |
| /// Reads the specified number of characters from the stream, starting from a | |
| /// specified point in the byte array. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// The buffer to read data into. | |
| /// </param> | |
| /// <param name="index"> | |
| /// The index of buffer to start reading into. | |
| /// </param> | |
| /// <param name="count"> | |
| /// The number of characters to read into the buffer. | |
| /// </param> | |
| /// <returns> | |
| /// The number of characters read into buffer. This might be less than the number of | |
| /// characters requested if that many characters are not available, | |
| /// or it might be zero if the end of the stream is reached. | |
| /// </returns> | |
| public int Read(char[] buffer, int index, int count) { | |
| int amountRead = 0; | |
| while (amountRead < count) { | |
| if (!streamStack.HasData()) { | |
| if (StreamData() == 0) { | |
| return amountRead; | |
| } | |
| } | |
| amountRead += streamStack.Read(buffer, index + amountRead, count - amountRead); | |
| } | |
| return amountRead; | |
| } | |
| /// <summary> | |
| /// Reads a series of bytes delimited by the byte encoding of newline for this platform. | |
| /// the newline bytes will not be included in the return data. | |
| /// </summary> | |
| /// <returns> | |
| /// A byte array containing all the data up to but not including the next newline in the stack. | |
| /// </returns> | |
| public byte[] ReadByteLine() { | |
| var builder = new MemoryStream(); | |
| while (true) { | |
| if (!streamStack.HasData()) { | |
| if (StreamData() == 0) { | |
| return builder.Length > 0 ? builder.ToArray() : null; | |
| } | |
| } | |
| bool hitStreamEnd; | |
| byte[] line = streamStack.ReadByteLine(out hitStreamEnd); | |
| builder.Write(line, 0, line.Length); | |
| if (!hitStreamEnd) { | |
| return builder.ToArray(); | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Reads a line from the stack delimited by the newline for this platform. The newline | |
| /// characters will not be included in the stream | |
| /// </summary> | |
| /// <returns> | |
| /// The <see cref="string" /> containing the line or null if end of stream. | |
| /// </returns> | |
| public string ReadLine() { | |
| byte[] data = ReadByteLine(); | |
| return data == null ? null : encoding.GetString(data); | |
| } | |
| #endregion | |
| #region Methods | |
| /// <summary> | |
| /// Determines the byte order marking offset (if any) from the | |
| /// given buffer. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// The buffer to examine. | |
| /// </param> | |
| /// <returns> | |
| /// The <see cref="int" /> representing the length of the byte order marking. | |
| /// </returns> | |
| private int GetBomOffset(byte[] buffer) { | |
| byte[] bom = encoding.GetPreamble(); | |
| bool usesBom = true; | |
| for (int i = 0; i < bom.Length; ++i) { | |
| if (bom[i] != buffer[i]) { | |
| usesBom = false; | |
| } | |
| } | |
| return usesBom ? bom.Length : 0; | |
| } | |
| /// <summary> | |
| /// Reads more data from the stream into the stream stack. | |
| /// </summary> | |
| /// <returns> | |
| /// The number of bytes read into the stream stack as an <see cref="int" /> | |
| /// </returns> | |
| private int StreamData() { | |
| if (!Length.HasValue || BytesRead < Length) { | |
| var buffer = new byte[bufferSize]; | |
| int amountRead = stream.Read(buffer, 0, buffer.Length); | |
| BytesRead += amountRead; | |
| // We need to check if our stream is using our encodings | |
| // BOM, if it is we need to jump it. | |
| int bomOffset = GetBomOffset(buffer); | |
| // Sometimes we'll get a buffer that's smaller then we expect, chop it down | |
| // for the reader: | |
| if (amountRead - bomOffset > 0) { | |
| if (amountRead != buffer.Length || bomOffset > 0) { | |
| var smallBuffer = new byte[amountRead - bomOffset]; | |
| System.Buffer.BlockCopy(buffer, bomOffset, smallBuffer, 0, amountRead - bomOffset); | |
| streamStack.Push(smallBuffer); | |
| } else { | |
| streamStack.Push(buffer); | |
| } | |
| } | |
| return amountRead; | |
| } else { | |
| return 0; | |
| } | |
| } | |
| #endregion | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.IO; | |
| using System.Linq; | |
| using System.Text; | |
| using System.Threading.Tasks; | |
| using HttpMultipartParser; | |
| namespace HttpServer { | |
| public class RebufferableBinaryReaderStreamAdapter : Stream { | |
| RebufferableBinaryReader inner; | |
| long length; | |
| long read; | |
| public RebufferableBinaryReaderStreamAdapter(RebufferableBinaryReader reader, long length) { | |
| this.inner = reader; | |
| this.length = length; | |
| this.read = 0; | |
| } | |
| public override bool CanRead { | |
| get { return true; } | |
| } | |
| public override bool CanSeek { | |
| get { return false; } | |
| } | |
| public override bool CanWrite { | |
| get { return false; } | |
| } | |
| public override void Flush() { | |
| } | |
| public override long Length { | |
| get { return length; } | |
| } | |
| public override long Position { | |
| get { | |
| throw new NotSupportedException(); | |
| } | |
| set { | |
| throw new NotSupportedException(); | |
| } | |
| } | |
| public override int Read(byte[] buffer, int offset, int count) { | |
| if (read < length) { | |
| int n = inner.Read(buffer, offset, count); | |
| read += n; | |
| return n; | |
| } else { | |
| return 0; | |
| } | |
| } | |
| public override long Seek(long offset, SeekOrigin origin) { | |
| throw new NotSupportedException(); | |
| } | |
| public override void SetLength(long value) { | |
| throw new NotSupportedException(); | |
| } | |
| public override void Write(byte[] buffer, int offset, int count) { | |
| throw new NotSupportedException(); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #region CPL License | |
| /* | |
| Nuclex Framework | |
| Copyright (C) 2002-2012 Nuclex Development Labs | |
| This library is free software; you can redistribute it and/or | |
| modify it under the terms of the IBM Common Public License as | |
| published by the IBM Corporation; either version 1.0 of the | |
| License, or (at your option) any later version. | |
| This library is distributed in the hope that it will be useful, | |
| but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| IBM Common Public License for more details. | |
| You should have received a copy of the IBM Common Public | |
| License along with this library | |
| */ | |
| #endregion | |
| using System; | |
| using System.Collections.Generic; | |
| using System.Diagnostics; | |
| using System.IO; | |
| namespace Nuclex.Support.IO { | |
| /// <summary>Wraps a stream and exposes only a limited region of its data</summary> | |
| public class PartialStream : Stream { | |
| /// <summary>Initializes a new partial stream</summary> | |
| /// <param name="stream"> | |
| /// Stream the wrapper will make a limited region accessible of | |
| /// </param> | |
| /// <param name="start"> | |
| /// Start index in the stream which becomes the beginning for the wrapper | |
| /// </param> | |
| /// <param name="length"> | |
| /// Length the wrapped stream should report and allow access to | |
| /// </param> | |
| public PartialStream(Stream stream, long start, long length) { | |
| if(start < 0) { | |
| throw new ArgumentException("Start index must not be less than 0", "start"); | |
| } | |
| if(!stream.CanSeek) { | |
| if(start != 0) { | |
| throw new ArgumentException( | |
| "The only valid start for unseekable streams is 0", "start" | |
| ); | |
| } | |
| } else { | |
| if(start + length > stream.Length) { | |
| throw new ArgumentException( | |
| "Partial stream exceeds end of full stream", "length" | |
| ); | |
| } | |
| } | |
| this.stream = stream; | |
| this.start = start; | |
| this.length = length; | |
| } | |
| /// <summary>Whether data can be read from the stream</summary> | |
| public override bool CanRead { | |
| get { return this.stream.CanRead; } | |
| } | |
| /// <summary>Whether the stream supports seeking</summary> | |
| public override bool CanSeek { | |
| get { return this.stream.CanSeek; } | |
| } | |
| /// <summary>Whether data can be written into the stream</summary> | |
| public override bool CanWrite { | |
| get { return this.stream.CanWrite; } | |
| } | |
| /// <summary> | |
| /// Clears all buffers for this stream and causes any buffered data to be written | |
| /// to the underlying device. | |
| /// </summary> | |
| public override void Flush() { | |
| this.stream.Flush(); | |
| } | |
| /// <summary>Length of the stream in bytes</summary> | |
| /// <exception cref="NotSupportedException"> | |
| /// The wrapped stream does not support seeking | |
| /// </exception> | |
| public override long Length { | |
| get { return this.length; } | |
| } | |
| /// <summary>Absolute position of the file pointer within the stream</summary> | |
| /// <exception cref="NotSupportedException"> | |
| /// The wrapped stream does not support seeking | |
| /// </exception> | |
| public override long Position { | |
| get { | |
| if(!this.stream.CanSeek) { | |
| throw makeSeekNotSupportedException("seek"); | |
| } | |
| return this.position; | |
| } | |
| set { moveFilePointer(value); } | |
| } | |
| /// <summary> | |
| /// Reads a sequence of bytes from the stream and advances the position of | |
| /// the file pointer by the number of bytes read. | |
| /// </summary> | |
| /// <param name="buffer">Buffer that will receive the data read from the stream</param> | |
| /// <param name="offset"> | |
| /// Offset in the buffer at which the stream will place the data read | |
| /// </param> | |
| /// <param name="count">Maximum number of bytes that will be read</param> | |
| /// <returns> | |
| /// The number of bytes that were actually read from the stream and written into | |
| /// the provided buffer | |
| /// </returns> | |
| /// <exception cref="NotSupportedException"> | |
| /// The wrapped stream does not support reading | |
| /// </exception> | |
| public override int Read(byte[] buffer, int offset, int count) { | |
| if(!this.stream.CanRead) { | |
| throw new NotSupportedException( | |
| "Can't read: the wrapped stream doesn't support reading" | |
| ); | |
| } | |
| long remaining = this.length - this.position; | |
| int bytesToRead = (int)Math.Min(count, remaining); | |
| if(this.stream.CanSeek) { | |
| this.stream.Position = this.position + this.start; | |
| } | |
| int bytesRead = this.stream.Read(buffer, offset, bytesToRead); | |
| this.position += bytesRead; | |
| return bytesRead; | |
| } | |
| /// <summary>Changes the position of the file pointer</summary> | |
| /// <param name="offset"> | |
| /// Offset to move the file pointer by, relative to the position indicated by | |
| /// the <paramref name="origin" /> parameter. | |
| /// </param> | |
| /// <param name="origin"> | |
| /// Reference point relative to which the file pointer is placed | |
| /// </param> | |
| /// <returns>The new absolute position within the stream</returns> | |
| public override long Seek(long offset, SeekOrigin origin) { | |
| switch(origin) { | |
| case SeekOrigin.Begin: { | |
| return Position = offset; | |
| } | |
| case SeekOrigin.Current: { | |
| return Position += offset; | |
| } | |
| case SeekOrigin.End: { | |
| return Position = (Length + offset); | |
| } | |
| default: { | |
| throw new ArgumentException("Invalid seek origin", "origin"); | |
| } | |
| } | |
| } | |
| /// <summary>Changes the length of the stream</summary> | |
| /// <param name="value">New length the stream shall have</param> | |
| /// <exception cref="NotSupportedException"> | |
| /// Always, the stream chainer does not support the SetLength() operation | |
| /// </exception> | |
| public override void SetLength(long value) { | |
| throw new NotSupportedException("Resizing partial streams is not supported"); | |
| } | |
| /// <summary> | |
| /// Writes a sequence of bytes to the stream and advances the position of | |
| /// the file pointer by the number of bytes written. | |
| /// </summary> | |
| /// <param name="buffer"> | |
| /// Buffer containing the data that will be written to the stream | |
| /// </param> | |
| /// <param name="offset"> | |
| /// Offset in the buffer at which the data to be written starts | |
| /// </param> | |
| /// <param name="count">Number of bytes that will be written into the stream</param> | |
| /// <remarks> | |
| /// The behavior of this method is as follows: If one or more chained streams | |
| /// do not support seeking, all data is appended to the final stream in the | |
| /// chain. Otherwise, writing will begin with the stream the current file pointer | |
| /// offset falls into. If the end of that stream is reached, writing continues | |
| /// in the next stream. On the last stream, writing more data into the stream | |
| /// that it current size allows will enlarge the stream. | |
| /// </remarks> | |
| public override void Write(byte[] buffer, int offset, int count) { | |
| long remaining = this.length - this.position; | |
| if(count > remaining) { | |
| throw new NotSupportedException( | |
| "Cannot extend the length of the partial stream" | |
| ); | |
| } | |
| if(this.stream.CanSeek) { | |
| this.stream.Position = this.position + this.start; | |
| } | |
| this.stream.Write(buffer, offset, count); | |
| this.position += count; | |
| } | |
| /// <summary>Stream being wrapped by the partial stream wrapper</summary> | |
| public Stream CompleteStream { | |
| get { return this.stream; } | |
| } | |
| /// <summary>Moves the file pointer</summary> | |
| /// <param name="position">New position the file pointer will be moved to</param> | |
| private void moveFilePointer(long position) { | |
| if(!this.stream.CanSeek) { | |
| throw makeSeekNotSupportedException("seek"); | |
| } | |
| // Seemingly, it is okay to move the file pointer beyond the end of | |
| // the stream until you try to Read() or Write() | |
| this.position = position; | |
| } | |
| /// <summary> | |
| /// Constructs a NotSupportException for an error caused by the wrapped | |
| /// stream having no seek support | |
| /// </summary> | |
| /// <param name="action">Action that was tried to perform</param> | |
| /// <returns>The newly constructed NotSupportedException</returns> | |
| private static NotSupportedException makeSeekNotSupportedException(string action) { | |
| return new NotSupportedException( | |
| string.Format( | |
| "Can't {0}: the wrapped stream does not support seeking", | |
| action | |
| ) | |
| ); | |
| } | |
| /// <summary>Streams that have been chained together</summary> | |
| private Stream stream; | |
| /// <summary>Start index of the partial stream in the wrapped stream</summary> | |
| private long start; | |
| /// <summary>Zero-based position of the partial stream's file pointer</summary> | |
| /// <remarks> | |
| /// If the stream does not support seeking, the position will simply be counted | |
| /// up until it reaches <see cref="PartialStream.length" />. | |
| /// </remarks> | |
| private long position; | |
| /// <summary>Length of the partial stream</summary> | |
| private long length; | |
| } | |
| } // namespace Nuclex.Support.IO |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Data; | |
| using System.Data.Entity; | |
| using System.Data.SqlClient; | |
| using System.IO; | |
| namespace FilePublicator2.Infrastructure | |
| { | |
| public class VarbinaryStream : Stream | |
| { | |
| private SqlConnection _Connection; | |
| private string _TableName; | |
| private string _BinaryColumn; | |
| private string _KeyColumn; | |
| private int _KeyValue; | |
| private long _Offset; | |
| private SqlDataReader _SQLReader; | |
| private long _SQLReadPosition; | |
| private bool _AllowedToRead = false; | |
| public VarbinaryStream( | |
| string ConnectionString, | |
| string TableName, | |
| string BinaryColumn, | |
| string KeyColumn, | |
| int KeyValue, | |
| bool AllowRead = false) | |
| { | |
| // create own connection with the connection string. | |
| _Connection = new SqlConnection(ConnectionString); | |
| _TableName = TableName; | |
| _BinaryColumn = BinaryColumn; | |
| _KeyColumn = KeyColumn; | |
| _KeyValue = KeyValue; | |
| // only query the database for a result if we are going to be reading, otherwise skip. | |
| _AllowedToRead = AllowRead; | |
| if (_AllowedToRead == true) | |
| { | |
| try | |
| { | |
| if (_Connection.State != ConnectionState.Open) | |
| _Connection.Open(); | |
| SqlCommand cmd = new SqlCommand( | |
| @"SELECT TOP 1 [" + _BinaryColumn + @"] | |
| FROM [dbo].[" + _TableName + @"] | |
| WHERE [" + _KeyColumn + "] = @id", | |
| _Connection); | |
| cmd.Parameters.Add(new SqlParameter("@id", _KeyValue)); | |
| _SQLReader = cmd.ExecuteReader( | |
| CommandBehavior.SequentialAccess | | |
| CommandBehavior.SingleResult | | |
| CommandBehavior.SingleRow | | |
| CommandBehavior.CloseConnection); | |
| _SQLReader.Read(); | |
| } | |
| catch (Exception e) | |
| { | |
| // log errors here | |
| } | |
| } | |
| } | |
| // this method will be called as part of the Stream ímplementation when we try to write to our VarbinaryStream class. | |
| public override void Write(byte[] buffer, int index, int count) | |
| { | |
| try | |
| { | |
| if (_Connection.State != ConnectionState.Open) | |
| _Connection.Open(); | |
| if (_Offset == 0) | |
| { | |
| // for the first write we just send the bytes to the Column | |
| SqlCommand cmd = new SqlCommand( | |
| @"UPDATE [dbo].[" + _TableName + @"] | |
| SET [" + _BinaryColumn + @"] = @firstchunk | |
| WHERE [" + _KeyColumn + "] = @id", | |
| _Connection); | |
| cmd.Parameters.Add(new SqlParameter("@firstchunk", buffer)); | |
| cmd.Parameters.Add(new SqlParameter("@id", _KeyValue)); | |
| cmd.ExecuteNonQuery(); | |
| _Offset = count; | |
| } | |
| else | |
| { | |
| // for all updates after the first one we use the TSQL command .WRITE() to append the data in the database | |
| SqlCommand cmd = new SqlCommand( | |
| @"UPDATE [dbo].[" + _TableName + @"] | |
| SET [" + _BinaryColumn + @"].WRITE(@chunk, NULL, @length) | |
| WHERE [" + _KeyColumn + "] = @id", | |
| _Connection); | |
| cmd.Parameters.Add(new SqlParameter("@chunk", buffer)); | |
| cmd.Parameters.Add(new SqlParameter("@length", count)); | |
| cmd.Parameters.Add(new SqlParameter("@id", _KeyValue)); | |
| cmd.ExecuteNonQuery(); | |
| _Offset += count; | |
| } | |
| } | |
| catch (Exception e) | |
| { | |
| // log errors here | |
| } | |
| } | |
| // this method will be called as part of the Stream ímplementation when we try to read from our VarbinaryStream class. | |
| public override int Read(byte[] buffer, int offset, int count) | |
| { | |
| try | |
| { | |
| long bytesRead = _SQLReader.GetBytes(0, _SQLReadPosition, buffer, offset, count); | |
| _SQLReadPosition += bytesRead; | |
| return (int)bytesRead; | |
| } | |
| catch (Exception e) | |
| { | |
| // log errors here | |
| } | |
| return -1; | |
| } | |
| public override bool CanRead | |
| { | |
| get { return _AllowedToRead; } | |
| } | |
| #region unimplemented methods | |
| public override bool CanSeek | |
| { | |
| get { return false; } | |
| } | |
| public override bool CanWrite | |
| { | |
| get { return true; } | |
| } | |
| public override void Flush() | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| public override long Length | |
| { | |
| get { throw new NotImplementedException(); } | |
| } | |
| public override long Position | |
| { | |
| get | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| set | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| } | |
| public override long Seek(long offset, SeekOrigin origin) | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| public override void SetLength(long value) | |
| { | |
| throw new NotImplementedException(); | |
| } | |
| #endregion unimplemented methods | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //--------------------------------------------------------------------- | |
| // File: VirtualStream.cs | |
| // | |
| // Summary: A sample pipeline component which demonstrates how to promote message context | |
| // properties and write distinguished fields for XML messages using arbitrary | |
| // XPath expressions. | |
| // | |
| // Sample: Arbitrary XPath Property Handler Pipeline Component SDK | |
| // | |
| //--------------------------------------------------------------------- | |
| // This file is part of the Microsoft BizTalk Server 2006 SDK | |
| // | |
| // Copyright (c) Microsoft Corporation. All rights reserved. | |
| // | |
| // This source code is intended only as a supplement to Microsoft BizTalk | |
| // Server 2006 release and/or on-line documentation. See these other | |
| // materials for detailed information regarding Microsoft code samples. | |
| // | |
| // THIS CODE AND INFORMATION ARE PROVIDED "AS IS" WITHOUT WARRANTY OF ANY | |
| // KIND, WHETHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE | |
| // IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR | |
| // PURPOSE. | |
| //--------------------------------------------------------------------- | |
| using System; | |
| using System.IO; | |
| using System.Text; | |
| using System.Runtime.InteropServices; | |
| using Microsoft.Win32.SafeHandles; | |
| namespace Microsoft.Samples.BizTalk.Adapter.Tcp | |
| { | |
| /// <summary> | |
| /// Implements a virtual stream, i.e. the always seekable stream which | |
| /// uses configurable amount of memory to reduce a memory footprint and | |
| /// temporarily stores remaining data in a temporary file on disk. | |
| /// </summary> | |
| public sealed class VirtualStream : Stream, IDisposable | |
| { | |
| /// <summary> | |
| /// Memory handling. | |
| /// </summary> | |
| public enum MemoryFlag | |
| { | |
| AutoOverFlowToDisk = 0, | |
| OnlyInMemory = 1, | |
| OnlyToDisk = 2 | |
| } | |
| // Constants | |
| private const int MemoryThreshold = 4*1024*1024; // The maximum possible memory consumption (4Mb) | |
| private const int DefaultMemorySize = 4*1024; // Default memory consumption (4Kb) | |
| private Stream wrappedStream; | |
| private bool isDisposed; | |
| private bool isInMemory; | |
| private int thresholdSize; | |
| private MemoryFlag memoryStatus; | |
| /// <summary> | |
| /// Initializes a VirtualStream instance with default parameters (10K memory buffer, | |
| /// allow overflow to disk). | |
| /// </summary> | |
| public VirtualStream() | |
| : this(DefaultMemorySize, MemoryFlag.AutoOverFlowToDisk, new MemoryStream()) | |
| { | |
| } | |
| /// <summary> | |
| /// Initializes a VirtualStream instance with memory buffer size. | |
| /// </summary> | |
| /// <param name="bufferSize">Memory buffer size</param> | |
| public VirtualStream(int bufferSize) | |
| : this(bufferSize, MemoryFlag.AutoOverFlowToDisk, new MemoryStream(bufferSize)) | |
| { | |
| } | |
| /// <summary> | |
| /// Initializes a VirtualStream instance with a default memory size and memory flag specified. | |
| /// </summary> | |
| /// <param name="flag">Memory flag</param> | |
| public VirtualStream(MemoryFlag flag) | |
| : this(DefaultMemorySize, flag, | |
| (flag == MemoryFlag.OnlyToDisk) ? CreatePersistentStream() : new MemoryStream()) | |
| { | |
| } | |
| /// <summary> | |
| /// Initializes a VirtualStream instance with a memory buffer size and memory flag specified. | |
| /// </summary> | |
| /// <param name="bufferSize">Memory buffer size</param> | |
| /// <param name="flag">Memory flag</param> | |
| public VirtualStream(int bufferSize, MemoryFlag flag) | |
| : this(bufferSize, flag, | |
| (flag == MemoryFlag.OnlyToDisk) ? CreatePersistentStream() : new MemoryStream(bufferSize)) | |
| { | |
| } | |
| /// <summary> | |
| /// Initializes a VirtualStream instance with a memory buffer size, memory flag and underlying stream | |
| /// specified. | |
| /// </summary> | |
| /// <param name="bufferSize">Memory buffer size</param> | |
| /// <param name="flag">Memory flag</param> | |
| /// <param name="dataStream">Underlying stream</param> | |
| private VirtualStream(int bufferSize, MemoryFlag flag, Stream dataStream) | |
| { | |
| if (null == dataStream) | |
| throw new ArgumentNullException("dataStream"); | |
| isInMemory = (flag != MemoryFlag.OnlyToDisk); | |
| memoryStatus = flag; | |
| bufferSize = Math.Min(bufferSize, MemoryThreshold); | |
| thresholdSize = bufferSize; | |
| if (isInMemory) | |
| wrappedStream = dataStream; // Don't want to double wrap memory stream | |
| else | |
| wrappedStream = new BufferedStream(dataStream, bufferSize); | |
| isDisposed = false; | |
| } | |
| #region Stream Methods and Properties | |
| /// <summary> | |
| /// Gets a flag indicating whether a stream can be read. | |
| /// </summary> | |
| override public bool CanRead | |
| { | |
| get {return wrappedStream.CanRead;} | |
| } | |
| /// <summary> | |
| /// Gets a flag indicating whether a stream can be written. | |
| /// </summary> | |
| override public bool CanWrite | |
| { | |
| get {return wrappedStream.CanWrite;} | |
| } | |
| /// <summary> | |
| /// Gets a flag indicating whether a stream can seek. | |
| /// </summary> | |
| override public bool CanSeek | |
| { | |
| get {return true;} | |
| } | |
| /// <summary> | |
| /// Returns the length of the source stream. | |
| /// <seealso cref="GetLength()"/> | |
| /// </summary> | |
| override public long Length | |
| { | |
| get {return wrappedStream.Length;} | |
| } | |
| /// <summary> | |
| /// Gets or sets a position in the stream. | |
| /// </summary> | |
| override public long Position | |
| { | |
| get {return wrappedStream.Position;} | |
| set {wrappedStream.Seek(value, SeekOrigin.Begin);} | |
| } | |
| /// <summary> | |
| /// <see cref="Stream.Close()"/> | |
| /// </summary> | |
| /// <remarks> | |
| /// Calling other methods after calling Close() may result in a ObjectDisposedException beeing throwed. | |
| /// </remarks> | |
| override public void Close() | |
| { | |
| if(!isDisposed) | |
| { | |
| GC.SuppressFinalize(this); | |
| Cleanup(); | |
| } | |
| } | |
| /// <summary> | |
| /// <see cref="Stream.Flush()"/> | |
| /// </summary> | |
| /// <remarks> | |
| /// </remarks> | |
| override public void Flush() | |
| { | |
| ThrowIfDisposed(); | |
| wrappedStream.Flush(); | |
| } | |
| /// <summary> | |
| /// <see cref="Stream.Read()"/> | |
| /// </summary> | |
| /// <param name="buffer"></param> | |
| /// <param name="offset"></param> | |
| /// <param name="count"></param> | |
| /// <returns> | |
| /// The number of bytes read | |
| /// </returns> | |
| /// <remarks> | |
| /// May throw <see cref="ObjectDisposedException"/>. | |
| /// It will read from cached persistence stream | |
| /// </remarks> | |
| override public int Read(byte[] buffer, int offset, int count) | |
| { | |
| ThrowIfDisposed(); | |
| return wrappedStream.Read(buffer, offset, count); | |
| } | |
| /// <summary> | |
| /// <see cref="Stream.Seek()"/> | |
| /// </summary> | |
| /// <param name="offset"></param> | |
| /// <param name="origin"></param> | |
| /// <returns> | |
| /// The current position | |
| /// </returns> | |
| /// <remarks> | |
| /// May throw <see cref="ObjectDisposedException"/>. | |
| /// It will cache any new data into the persistence stream | |
| /// </remarks> | |
| override public long Seek(long offset, SeekOrigin origin) | |
| { | |
| ThrowIfDisposed(); | |
| return wrappedStream.Seek(offset, origin); | |
| } | |
| /// <summary> | |
| /// <see cref="Stream.SetLength()"/> | |
| /// </summary> | |
| /// <param name="length"></param> | |
| /// <remarks> | |
| /// May throw <see cref="ObjectDisposedException"/>. | |
| /// </remarks> | |
| override public void SetLength(long length) | |
| { | |
| ThrowIfDisposed(); | |
| // Check if new position is greater than allowed by threshold | |
| if (memoryStatus == MemoryFlag.AutoOverFlowToDisk && | |
| isInMemory && | |
| length > thresholdSize) | |
| { | |
| // Currently in memory, and the new write will push it over the limit | |
| // Switching to Persist Stream | |
| BufferedStream persistStream = new BufferedStream(CreatePersistentStream(), thresholdSize); | |
| // Copy current wrapped memory stream to the persist stream | |
| CopyStreamContent((MemoryStream)wrappedStream, persistStream); | |
| // Close old wrapped stream | |
| if (wrappedStream != null) | |
| wrappedStream.Close(); | |
| wrappedStream = persistStream; | |
| isInMemory = false; | |
| } | |
| // Set new length for the wrapped stream | |
| wrappedStream.SetLength(length); | |
| } | |
| /// <summary> | |
| /// <see cref="Stream.Write()"/> | |
| /// <param name="buffer"></param> | |
| /// <param name="offset"></param> | |
| /// <param name="count"></param> | |
| /// </summary> | |
| /// <remarks> | |
| /// Write to the underlying stream. | |
| /// </remarks> | |
| override public void Write(byte[] buffer, int offset, int count) | |
| { | |
| ThrowIfDisposed(); | |
| // Check if new position after write is greater than allowed by threshold | |
| if (memoryStatus == MemoryFlag.AutoOverFlowToDisk && | |
| isInMemory && | |
| (count + wrappedStream.Position) > thresholdSize) | |
| { | |
| // Currently in memory, and the new write will push it over the limit | |
| // Switching to Persist Stream | |
| BufferedStream persistStream = new BufferedStream(CreatePersistentStream(), thresholdSize); | |
| // Copy current wrapped memory stream to the persist stream | |
| CopyStreamContent((MemoryStream) wrappedStream, persistStream); | |
| // Close old wrapped stream | |
| if (wrappedStream != null) | |
| wrappedStream.Close(); | |
| wrappedStream = persistStream; | |
| isInMemory = false; | |
| } | |
| wrappedStream.Write(buffer, offset, count); | |
| } | |
| #endregion | |
| #region IDisposable Interface | |
| /// <summary> | |
| /// <see cref="IDisposeable.Dispose()"/> | |
| /// </summary> | |
| /// <remarks> | |
| /// It will call <see cref="Close()"/> | |
| /// </remarks> | |
| public void Dispose() | |
| { | |
| Close(); | |
| } | |
| #endregion | |
| #region Private Utility Functions | |
| /// <summary> | |
| /// Utility method called by the Finalize(), Close() or Dispose() to close and release | |
| /// both the source and the persistence stream. | |
| /// </summary> | |
| private void Cleanup() | |
| { | |
| if(!isDisposed) | |
| { | |
| isDisposed = true; | |
| if(null != wrappedStream) | |
| { | |
| wrappedStream.Close(); | |
| wrappedStream = null; | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Copies source memory stream to the target stream. | |
| /// </summary> | |
| /// <param name="source">Source memory stream</param> | |
| /// <param name="target">Target stream</param> | |
| private void CopyStreamContent(MemoryStream source, Stream target) | |
| { | |
| // Remember position for the source stream | |
| long currentPosition = source.Position; | |
| // Read and write in chunks each thresholdSize | |
| byte[] tempBuffer = new Byte[thresholdSize]; | |
| int read = 0; | |
| source.Position = 0; | |
| while ((read = source.Read(tempBuffer, 0, tempBuffer.Length)) != 0) | |
| target.Write(tempBuffer, 0, read); | |
| // Set target's stream position to be the same as was in source stream. This is required because | |
| // target stream is going substitute source stream. | |
| target.Position = currentPosition; | |
| // Restore source stream's position (just in case to preserve the source stream's state) | |
| source.Position = currentPosition; | |
| } | |
| /// <summary> | |
| /// Called by other methods to check the stream state. | |
| /// It will thorw <see cref="ObjectDisposedException"/> if the stream was closed or disposed. | |
| /// </summary> | |
| private void ThrowIfDisposed() | |
| { | |
| if(isDisposed || null == wrappedStream) | |
| throw new ObjectDisposedException("VirtualStream"); | |
| } | |
| /// <summary> | |
| /// Utility method. | |
| /// Creates a FileStream with a unique name and the temporary and delete-on-close attributes. | |
| /// </summary> | |
| /// <returns> | |
| /// The temporary persistence stream | |
| /// </returns> | |
| public static Stream CreatePersistentStream() | |
| { | |
| StringBuilder name = new StringBuilder(256); | |
| IntPtr handle; | |
| if(0 == GetTempFileName(Path.GetTempPath(), "BTS", 0, name)) | |
| throw new IOException("GetTempFileName Failed.", Marshal.GetHRForLastWin32Error()); | |
| handle = CreateFile(name.ToString(), (UInt32) FileAccess.ReadWrite, 0, IntPtr.Zero, (UInt32) FileMode.Create, 0x04000100, IntPtr.Zero); | |
| // FileStream constructor will throw exception if handle is zero or -1. | |
| return new FileStream(new SafeFileHandle(handle, true), FileAccess.ReadWrite); | |
| } | |
| [DllImport("kernel32.dll")] | |
| private static extern UInt32 GetTempFileName | |
| ( | |
| string path, | |
| string prefix, | |
| UInt32 unique, | |
| StringBuilder name | |
| ); | |
| [DllImport("kernel32.dll")] | |
| private static extern IntPtr CreateFile | |
| ( | |
| string name, | |
| UInt32 accessMode, | |
| UInt32 shareMode, | |
| IntPtr security, | |
| UInt32 createMode, | |
| UInt32 flags, | |
| IntPtr template | |
| ); | |
| #endregion | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment