-
-
Save nikes/1c2ade591b2f1a2945bb2d5af11dca9b to your computer and use it in GitHub Desktop.
C# implementation of Server Side Event Source
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014 Jonathan Bradshaw. All rights reserved. | |
* Redistribution and use in source and binary forms, with or without modification, is permitted. | |
*/ | |
using System; | |
using System.Collections.Specialized; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using System.Net.Cache; | |
using System.Net.Mime; | |
using System.Text; | |
using System.Threading; | |
namespace EventSource4Net | |
{ | |
/// <summary> | |
/// An EventSource represents a long-lived HTTP connection through which a Web server can “push” textual messages. | |
/// To use these “Server Sent Events”, pass the server URL to the EventSource() constructor and then register | |
/// a message event handler on the resulting Event Source object. The EventSource attempts to be resilient to | |
/// transitory network errors and interruptions by automatically retrying connections to maintain persistence. | |
/// </summary> | |
public class EventSource : IDisposable | |
{ | |
#region Protected Fields | |
protected static readonly TraceSource Trace = new TraceSource("EventSource"); | |
protected const int DefaultRetryInterval = 3000; | |
#endregion Protected Fields | |
#region Private Fields | |
private readonly byte[] _buffer = new byte[8192]; | |
private string _eventId; | |
private StringBuilder _eventStream; | |
private string _eventType; | |
private Stream _httpStream; | |
private HttpWebRequest _httpWebRequest; | |
private HttpWebResponse _httpWebResponse; | |
private EventSourceState _readyState; | |
private int _retryInterval = DefaultRetryInterval; | |
private Timer _retryTimer; | |
private bool _shutdownToken; | |
#endregion Private Fields | |
#region Public Constructors | |
/// <summary> | |
/// Initializes a new instance of the <see cref="EventSource"/> class. | |
/// </summary> | |
/// <param name="requestUriString">The URL.</param> | |
public EventSource(string requestUriString) | |
: this(new Uri(requestUriString)) | |
{ | |
} | |
/// <summary> | |
/// Initializes a new instance of the <see cref="EventSource"/> class. | |
/// </summary> | |
/// <param name="requestUriString">The URL.</param> | |
public EventSource(Uri requestUriString) | |
{ | |
Url = requestUriString; | |
Timeout = 100000; // 100 seconds | |
_readyState = EventSourceState.Closed; | |
} | |
#endregion Public Constructors | |
#region Public Events | |
/// <summary> | |
/// Occurs when an error occurs. | |
/// </summary> | |
public event EventHandler<ServerSentErrorEventArgs> Error; | |
/// <summary> | |
/// Occurs when a message is available. | |
/// </summary> | |
public event EventHandler<ServerSentEventArgs> Message; | |
/// <summary> | |
/// Occurs when the ready state changes. | |
/// </summary> | |
public event EventHandler<StateChangeEventArgs> StateChange; | |
#endregion Public Events | |
#region Public Enums | |
/// <summary> | |
/// The possible values of the readyState property. | |
/// </summary> | |
public enum EventSourceState | |
{ | |
Connecting = 0, | |
Open = 1, | |
Closed = 2, | |
Shutdown = 3 | |
} | |
#endregion Public Enums | |
#region Public Properties | |
/// <summary> | |
/// Gets or sets the headers to be sent in the request. For more | |
/// customization override the ConfigureWebRequest method. | |
/// </summary> | |
public NameValueCollection Headers { get; set; } | |
/// <summary> | |
/// Gets or sets an optional message type filter. If set, | |
/// this filter specifies which event types to pass through. | |
/// </summary> | |
public string[] MessageTypes { get; set; } | |
/// <summary> | |
/// Gets the last event identifier. | |
/// </summary> | |
public string LastEventId { get { return _eventId; } } | |
/// <summary> | |
/// Gets the state of the connection. | |
/// </summary> | |
public EventSourceState ReadyState | |
{ | |
get { return _readyState; } | |
private set | |
{ | |
_readyState = value; | |
OnStateChangeEvent(new StateChangeEventArgs {NewState = value}); | |
} | |
} | |
/// <summary> | |
/// Gets or sets the initial connection timeout. | |
/// There is no timeout on the connection once established. | |
/// </summary> | |
public int Timeout { get; set; } | |
/// <summary> | |
/// The absolute URL to which the EventSource is connected. | |
/// </summary> | |
public Uri Url { get; private set; } | |
#endregion Public Properties | |
#region Public Methods | |
/// <summary> | |
/// Begin the process to connect to the the EventSource. The EventSource attempts to be resilient to | |
/// transitory network errors and interruptions by automatically retrying connections to maintain persistence. | |
/// </summary> | |
public void Connect() | |
{ | |
if (ReadyState == EventSourceState.Connecting || ReadyState == EventSourceState.Open) | |
throw new InvalidOperationException("Cannot call connect while connection is " + ReadyState); | |
_shutdownToken = false; | |
ConnectAsync(); | |
} | |
/// <summary> | |
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. | |
/// </summary> | |
public void Dispose() | |
{ | |
Shutdown(); | |
_httpWebRequest = null; | |
_httpWebResponse = null; | |
_retryTimer = null; | |
} | |
/// <summary> | |
/// This method shutsdown the connection. | |
/// </summary> | |
public void Shutdown() | |
{ | |
if (_shutdownToken) return; | |
_shutdownToken = true; | |
CloseConnection(); | |
ReadyState = EventSourceState.Shutdown; | |
} | |
#endregion Public Methods | |
#region Protected Methods | |
/// <summary> | |
/// Configures the web request object. Override this method to add custom | |
/// headers and settings to the request object as required. | |
/// </summary> | |
/// <param name="request">The HttpWebRequest request.</param> | |
protected virtual void ConfigureWebRequest(HttpWebRequest request) | |
{ | |
request.Accept = "text/event-stream"; | |
request.AllowAutoRedirect = true; | |
request.KeepAlive = true; | |
request.CachePolicy = new HttpRequestCachePolicy(HttpRequestCacheLevel.NoCacheNoStore); | |
if (Headers != null) request.Headers.Add(Headers); | |
if (!string.IsNullOrEmpty(LastEventId)) request.Headers.Add("Last-Event-Id", LastEventId); | |
} | |
/// <summary> | |
/// Called when a complete message is received (indicates by two newlines) | |
/// to process the line(s) and dispatch the event. Override this method to | |
/// customize the parsing. | |
/// </summary> | |
/// <param name="content">The lines received.</param> | |
protected virtual void DispatchEvent(string[] content) | |
{ | |
if (_shutdownToken) return; | |
StringBuilder sb = null; | |
foreach (var line in content) | |
{ | |
var pos = line.IndexOf(':'); | |
if (pos <= 0 || pos + 2 >= line.Length) continue; | |
var type = line.Substring(0, pos); | |
var value = line.Substring(pos + 2); | |
Trace.TraceInformation("DispatchEvent (Type={0})", type); | |
Trace.TraceData(TraceEventType.Verbose, 0, value); | |
switch (type) | |
{ | |
case "id": | |
_eventId = value; | |
break; | |
case "event": | |
_eventType = value; | |
break; | |
case "data": | |
if (IsWanted(_eventType, value)) | |
{ | |
if (sb == null) sb = new StringBuilder(); | |
sb.AppendLine(value); | |
} | |
break; | |
case "retry": | |
int.TryParse(value, out _retryInterval); | |
break; | |
} | |
} | |
if (sb == null || _shutdownToken) return; | |
OnMessageEvent(new ServerSentEventArgs | |
{ | |
EventId = _eventId, | |
EventType = _eventType, | |
Data = sb.ToString() | |
}); | |
} | |
/// <summary> | |
/// Determines whether the specified event type is filtered. By default this checks against | |
/// the list of MessageTypes (if specified) but can be overriden for additional tests. | |
/// </summary> | |
/// <param name="eventType">Type of the event.</param> | |
/// <param name="value">The event value string.</param> | |
/// <returns>[True] if the message should be processed.</returns> | |
protected virtual bool IsWanted(string eventType, string value) | |
{ | |
return MessageTypes == null || MessageTypes.Contains(eventType); | |
} | |
/// <summary> | |
/// Raises the <see cref="E:ErrorEvent" /> event. | |
/// </summary> | |
/// <param name="e">The <see cref="ServerSentErrorEventArgs"/> instance containing the event data.</param> | |
protected virtual void OnErrorEvent(ServerSentErrorEventArgs e) | |
{ | |
Trace.TraceInformation("Raising OnErrorEvent ({0})", e.Exception.Message); | |
var handler = Error; | |
if (handler != null) | |
{ | |
handler(this, e); | |
} | |
} | |
/// <summary> | |
/// Raises the <see cref="E:MessageEvent" /> event. | |
/// </summary> | |
/// <param name="e">The <see cref="ServerSentEventArgs"/> instance containing the event data.</param> | |
protected virtual void OnMessageEvent(ServerSentEventArgs e) | |
{ | |
Trace.TraceInformation("Raising OnMessageEvent ({0})", _eventType); | |
var handler = Message; | |
if (handler != null) | |
{ | |
handler(this, e); | |
} | |
} | |
/// <summary> | |
/// Raises the <see cref="E:StateChange" /> event. | |
/// </summary> | |
/// <param name="e">The <see cref="EventArgs"/> instance containing the event data.</param> | |
protected virtual void OnStateChangeEvent(StateChangeEventArgs e) | |
{ | |
Trace.TraceInformation("Raising OnStateChangeEvent ({0})", e.NewState); | |
var handler = StateChange; | |
if (handler != null) | |
{ | |
handler(this, e); | |
} | |
} | |
/// <summary> | |
/// Retries the connection after delay using a simple backoff mechanism. | |
/// </summary> | |
protected virtual void RetryAfterDelay(bool backoff = true) | |
{ | |
if (_retryInterval <= 0 || _shutdownToken) return; | |
// Attempt reconnection after retry interval | |
Trace.TraceInformation("RetryAfterDelay ({0}ms)", _retryInterval); | |
_retryTimer = new Timer( | |
delegate | |
{ | |
if (!_shutdownToken) ConnectAsync(); | |
}, | |
null, | |
Math.Max(DefaultRetryInterval, _retryInterval), | |
System.Threading.Timeout.Infinite); // Single shot timer | |
// Increase backoff timer up to a minute each retry | |
if (backoff) _retryInterval = (int)Math.Min(_retryInterval * 1.5, 60000); | |
} | |
#endregion Protected Methods | |
#region Private Methods | |
/// <summary> | |
/// Closes the connection. | |
/// </summary> | |
private void CloseConnection() | |
{ | |
if (ReadyState != EventSourceState.Connecting && ReadyState != EventSourceState.Open) return; | |
Trace.TraceInformation("CloseConnection"); | |
if (_httpWebRequest != null) | |
{ | |
_httpWebRequest.Abort(); | |
} | |
if (_httpWebResponse != null) | |
{ | |
_httpWebResponse.Close(); | |
} | |
if (_retryTimer != null) | |
{ | |
_retryTimer.Dispose(); | |
} | |
_eventStream = null; | |
_eventId = null; | |
_eventType = null; | |
ReadyState = EventSourceState.Closed; | |
} | |
/// <summary> | |
/// Connects to the event source. | |
/// </summary> | |
private void ConnectAsync() | |
{ | |
Trace.TraceInformation("ConnectAsync ({0})", Url); | |
ReadyState = EventSourceState.Connecting; | |
_httpWebRequest = (HttpWebRequest) WebRequest.Create(Url); | |
ConfigureWebRequest(_httpWebRequest); | |
try | |
{ | |
var handle = _httpWebRequest.BeginGetResponse(EndGetResponse, null); | |
ThreadPool.RegisterWaitForSingleObject( | |
handle.AsyncWaitHandle, | |
(state, timedOut) => | |
{ | |
if (!timedOut || _httpWebRequest == null || _shutdownToken) return; | |
Trace.TraceInformation("ConnectAsync (Timed Out)", Url); | |
OnErrorEvent(new ServerSentErrorEventArgs { Exception = new TimeoutException() }); | |
CloseConnection(); | |
RetryAfterDelay(); | |
}, | |
_httpWebRequest, | |
Timeout, | |
true); | |
} | |
catch (Exception ex) | |
{ | |
if (ex is WebException || ex is IOException) | |
{ | |
OnErrorEvent(new ServerSentErrorEventArgs { Exception = ex }); | |
CloseConnection(); | |
RetryAfterDelay(); | |
return; | |
} | |
throw; | |
} | |
} | |
/// <summary> | |
/// Ends the async get response. | |
/// </summary> | |
/// <param name="result">The IAsyncResult.</param> | |
/// <exception cref="System.NullReferenceException">GetResponseStream() returned null</exception> | |
private void EndGetResponse(IAsyncResult result) | |
{ | |
if (_shutdownToken) return; | |
try | |
{ | |
_httpWebResponse = (HttpWebResponse)_httpWebRequest.EndGetResponse(result); | |
_httpStream = _httpWebResponse.GetResponseStream(); | |
if (_shutdownToken) return; | |
} | |
catch (WebException ex) | |
{ | |
OnErrorEvent(new ServerSentErrorEventArgs { Exception = ex }); | |
CloseConnection(); | |
RetryAfterDelay(); | |
} | |
var contentType = new ContentType(_httpWebResponse.ContentType); | |
Trace.TraceInformation("EndGetResponse (StatusCode={0}, MediaType={1})", _httpWebResponse.StatusCode, contentType.MediaType); | |
if (_httpWebResponse.StatusCode != HttpStatusCode.OK || contentType.MediaType != "text/event-stream") | |
{ | |
// If we get the wrong content type or status code, as per spec, do not attempt to reconnect. | |
OnErrorEvent(new ServerSentErrorEventArgs | |
{ | |
Exception = new Exception("Unexpected response from server. Status " + | |
_httpWebResponse.StatusCode + ". Media Type " + contentType.MediaType) | |
}); | |
CloseConnection(); | |
return; | |
} | |
ReadyState = EventSourceState.Open; | |
_retryInterval = DefaultRetryInterval; | |
_eventStream = new StringBuilder(); | |
_eventId = null; | |
_eventType = null; | |
if (_shutdownToken) return; | |
if (_httpStream == null) throw new NullReferenceException("GetResponseStream"); | |
_httpStream.BeginRead(_buffer, 0, _buffer.Length, EndReadFromStream, null); | |
} | |
/// <summary> | |
/// Recursive method to read the network stream and process the data. | |
/// </summary> | |
/// <param name="result">The IAsyncResult.</param> | |
private void EndReadFromStream(IAsyncResult result) | |
{ | |
if (_shutdownToken) return; | |
try | |
{ | |
var bytesRead = _httpStream.EndRead(result); | |
Trace.TraceInformation("EndReadFromStream (Bytes={0})", bytesRead); | |
if (_shutdownToken) return; | |
if (bytesRead == 0) | |
{ | |
CloseConnection(); | |
RetryAfterDelay(); | |
return; | |
} | |
for (var i = 0; i < bytesRead; i++) | |
{ | |
if (i > 0 && _buffer[i] == '\n' && _buffer[i-1] == '\n' ) | |
{ | |
DispatchEvent(_eventStream.ToString().Split('\n')); | |
_eventStream.Length = 0; | |
} | |
else | |
{ | |
_eventStream.Append((char)_buffer[i]); | |
} | |
} | |
} | |
catch (WebException ex) | |
{ | |
OnErrorEvent(new ServerSentErrorEventArgs { Exception = ex }); | |
CloseConnection(); | |
RetryAfterDelay(); | |
return; | |
} | |
// Recursively call until we run out of data | |
if (!_shutdownToken && ReadyState == EventSourceState.Open) | |
_httpStream.BeginRead(_buffer, 0, _buffer.Length, EndReadFromStream, null); | |
} | |
#endregion Private Methods | |
#region Public Classes | |
/// <summary> | |
/// Server Sent Error Event Object | |
/// </summary> | |
public sealed class ServerSentErrorEventArgs : EventArgs | |
{ | |
#region Public Properties | |
public Exception Exception { get; internal set; } | |
#endregion Public Properties | |
} | |
/// <summary> | |
/// Server Sent Event Message Object | |
/// </summary> | |
public sealed class ServerSentEventArgs : EventArgs | |
{ | |
#region Public Properties | |
/// <summary> | |
/// Gets the data. | |
/// </summary> | |
public string Data { get; internal set; } | |
/// <summary> | |
/// Gets the event identifier. | |
/// </summary> | |
public string EventId { get; internal set; } | |
/// <summary> | |
/// Gets the type of the event. | |
/// </summary> | |
public string EventType { get; internal set; } | |
#endregion Public Properties | |
} | |
/// <summary> | |
/// Server Sent Error Event Object | |
/// </summary> | |
public sealed class StateChangeEventArgs : EventArgs | |
{ | |
#region Public Properties | |
public EventSourceState NewState { get; internal set; } | |
#endregion Public Properties | |
} | |
#endregion Public Classes | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment