Created
February 18, 2014 15:32
-
-
Save danbarua/9073209 to your computer and use it in GitHub Desktop.
Reactive Freeswitch ESL Parsing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary>The observable extensions.</summary> | |
public static class ObservableExtensions | |
{ | |
/// <summary>Aggregates a Stream using the supplied Aggregator until the given predicate is true</summary> | |
/// <param name="source">The source.</param> | |
/// <param name="seed">The seed.</param> | |
/// <param name="accumulator">The accumulator.</param> | |
/// <param name="predicate">A predicate which indicates whether the aggregation is completed.</param> | |
/// <typeparam name="TSource">The Type of the Source stream.</typeparam> | |
/// <typeparam name="TAccumulate">The Type of the Accumulator.</typeparam> | |
/// <returns>The <see cref="IObservable{T}"/>.</returns> | |
public static IObservable<TAccumulate> AggregateUntil<TSource, TAccumulate>( | |
this IObservable<TSource> source, | |
Func<TAccumulate> seed, | |
Func<TAccumulate, TSource, TAccumulate> accumulator, | |
Func<TAccumulate, bool> predicate) | |
{ | |
return Observable.Create<TAccumulate>(observer => | |
{ | |
var accumulate = seed(); | |
return source.Subscribe( | |
value => | |
{ | |
accumulate = accumulator(accumulate, value); | |
if (predicate(accumulate)) | |
{ | |
observer.OnNext(accumulate); | |
accumulate = seed(); | |
} | |
}, | |
observer.OnError, | |
observer.OnCompleted); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// A simple state-machine for parsing incoming EventSocket messages. | |
/// </summary> | |
public class Parser | |
{ | |
private readonly StringBuilder buffer = new StringBuilder(); // StringBuilder in .Net 4 uses a Linked List internally to avoid expensive reallocations. Faster but uses marginally more memory. | |
private char previous; | |
private int? contentLength; | |
private IDictionary<string, string> headers; | |
/// <summary> | |
/// Gets a value indicating whether parsing an incoming message has completed. | |
/// </summary> | |
public bool Completed { get; private set; } | |
/// <summary> | |
/// Gets a value indicating whether the incoming message has a body. | |
/// </summary> | |
public bool HasBody | |
{ | |
get | |
{ | |
return contentLength.HasValue; | |
} | |
} | |
/// <summary> | |
/// Appends the given <see cref="char"/> to the message. | |
/// </summary> | |
/// <param name="next">The next <see cref="char"/> of the message.</param> | |
/// <returns>The same instance of the <see cref="Parser"/>.</returns> | |
public Parser Append(char next) | |
{ | |
if (Completed) | |
{ | |
return new Parser().Append(next); | |
} | |
buffer.Append(next); | |
if (!HasBody) | |
{ | |
// we're parsing the headers | |
if (previous == '\n' && next == '\n') | |
{ | |
// \n\n denotes the end of the Headers | |
headers = buffer.ToString().ParseKeyValuePairs("\n", ": "); | |
if (headers.ContainsKey(HeaderNames.ContentLength)) | |
{ | |
contentLength = int.Parse(headers[HeaderNames.ContentLength]); | |
// start parsing the body content | |
buffer.Clear(); | |
// allocate the buffer up front given that we now know the expected size | |
buffer.EnsureCapacity(contentLength.Value); | |
} | |
else | |
{ | |
// end of message | |
Completed = true; | |
} | |
} | |
else | |
{ | |
previous = next; | |
} | |
} | |
else | |
{ | |
// if we've read the Content-Length amount of bytes then we're done | |
Completed = buffer.Length == contentLength.GetValueOrDefault(); | |
} | |
return this; | |
} | |
public BasicMessage ParseMessage() | |
{ | |
if (!Completed) | |
throw new InvalidOperationException("The message was not completely parsed."); | |
return HasBody ? new BasicMessage(headers, buffer.ToString()) : new BasicMessage(headers); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static class ParserExtensions | |
{ | |
public static IObservable<BasicMessage> ExtractBasicMessages( | |
this IObservable<byte[]> byteStream) | |
{ | |
return byteStream.SelectMany(x => Encoding.ASCII.GetString(x)).ExtractBasicMessages(); | |
} | |
public static IObservable<BasicMessage> ExtractBasicMessages( | |
this IObservable<char> charStream) | |
{ | |
return | |
charStream.AggregateUntil(() => new Parser(), (builder, ch) => builder.Append(ch), builder => builder.Completed) | |
.Select(builder => builder.ParseMessage()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment