Skip to content

Instantly share code, notes, and snippets.

@rpgmaker
Created August 30, 2012 03:26
Show Gist options
  • Save rpgmaker/3522016 to your computer and use it in GitHub Desktop.
Save rpgmaker/3522016 to your computer and use it in GitHub Desktop.
Http Streaming
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net;
using System.Threading;
using System.IO;
using System.Text.RegularExpressions;
namespace TestStreamingHttp {
static class Program {
static void ReadBuffer(IAsyncResult r) {
var tuple = r.AsyncState as Tuple<Stream, byte[], SynchronizationContext>;
var stream = tuple.Item1;
var buffer = tuple.Item2;
var context = tuple.Item3;
var read = stream.EndRead(r);
var eof = read == -1;
if (!eof)
ms.Write(buffer, 0, read);
if (HasData(buffer)) {
var data = ms.ToArray();
if (data.Length > 2) {
if (context != null)
context.Post(ProcessBuffer, data);
else ProcessBuffer(data);
}
}
lastBuffer = new byte[MAX_BUFFER_SIZE];
Buffer.BlockCopy(buffer, 0, lastBuffer, 0, read);
Array.Clear(buffer, 0, MAX_BUFFER_SIZE);
if (!eof)
stream.BeginRead(buffer, 0, MAX_BUFFER_SIZE,
ReadBuffer, new Tuple<Stream, byte[], SynchronizationContext>(stream, buffer, context));
//else
// reset.Set();
}
static bool HasData(byte[] buffer) {
var count = 0;
for(var i = 0; i < buffer.Length; i++)
if (buffer[i] == delimeter[i]) count++;
if (count == buffer.Length) return true;
if (lastBuffer == null) return false;
Buffer.BlockCopy(lastBuffer, 0, bigBuffer, 0, MAX_BUFFER_SIZE);
Buffer.BlockCopy(buffer, 0, bigBuffer, MAX_BUFFER_SIZE, MAX_BUFFER_SIZE);
var delimiterIndex = 0;
for (var i = 0; i < bigBuffer.Length; i++) {
if (bigBuffer[i] == delimeter[delimiterIndex]) {
if (++delimiterIndex == delimeter.Length)
if (i < MAX_BUFFER_SIZE)
return false;
else
return true;
} else
delimiterIndex = 0;
}
return false;
}
static bool HasDataEx(byte[] buffer) {
var count = 0;
for(var i = 0; i < buffer.Length; i++)
if (buffer[i] == delimeter[i]) count++;
if (count == buffer.Length) return true;
if (lastBuffer == null) return false;
Buffer.BlockCopy(lastBuffer, 0, bigBuffer, 0, MAX_BUFFER_SIZE);
Buffer.BlockCopy(buffer, 0, bigBuffer, MAX_BUFFER_SIZE, MAX_BUFFER_SIZE);
var delimiterIndex = 0;
for (var i = 0; i < bigBuffer.Length; i++) {
if (bigBuffer[i] == delimeter[delimiterIndex]) {
if (++delimiterIndex == delimeter.Length)
return true;
} else
delimiterIndex = 0;
}
return false;
}
private static void Clear(this MemoryStream source) {
byte[] buffer = source.GetBuffer();
Array.Clear(buffer, 0, buffer.Length);
source.Position = 0;
source.SetLength(0);
}
static void ProcessBuffer(object state) {
var buffer = state as byte[];
var text = System.Text.Encoding.UTF8.GetString(buffer);
var data = cometRegex.Match(text).Groups["Data"].Value;
Console.WriteLine(data);
ms.Clear();
}
const int MAX_BUFFER_SIZE = 8;
static ManualResetEvent reset = new ManualResetEvent(false);
readonly static byte[] delimeter = System.Text.Encoding.UTF8.GetBytes("</comet>");
static byte[] lastBuffer = null;
static byte[] bigBuffer = new byte[MAX_BUFFER_SIZE * 2];
static MemoryStream ms = new MemoryStream();
static Regex cometRegex = new Regex("<comet>(?<Data>.+?)</comet>", RegexOptions.Compiled | RegexOptions.Multiline);
static void Main(string[] args) {
var url = "http://localhost:49530/Default.aspx?stream=test";
var buffer = new byte[MAX_BUFFER_SIZE];
var client = WebRequest.Create(url) as HttpWebRequest;
client.KeepAlive = true;
client.Method = "GET";
client.BeginGetResponse(o => {
var tuple = o.AsyncState as Tuple<HttpWebRequest, SynchronizationContext>;
var context = tuple.Item2;
var request = tuple.Item1;
var response = request.EndGetResponse(o) as HttpWebResponse;
var stream = response.GetResponseStream();
stream.BeginRead(buffer, 0, MAX_BUFFER_SIZE, ReadBuffer,
new Tuple<Stream, byte[], SynchronizationContext>(stream, buffer, context));
}, new Tuple<HttpWebRequest, SynchronizationContext>(client, SynchronizationContext.Current));
while (true) {
if (Console.ReadLine().ToLower() == "exit") client.Abort();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment