Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alexandervantrijffel/1509d46fbf425c69d61aa0cbdd7a4440 to your computer and use it in GitHub Desktop.
Save alexandervantrijffel/1509d46fbf425c69d61aa0cbdd7a4440 to your computer and use it in GitHub Desktop.
namespace EventStoreClient
{
public static class EventStoreConnectionExtensions
{
static RestClient NewRestClient(IOptions<AppSettings> appSettings)
{
var set = appSettings.Value;
var client = new RestClient($"http://{set.EventStoreIPAddress}:{set.EventStoreWebAdminPort}");
client.Authenticator = new HttpBasicAuthenticator(set.EventStoreUsername, set.EventStorePassword);
return client;
}
public static void CreateProjection(IOptions<AppSettings> appSettings, string name, string javascript, Action<IRestResponse> responseHandler)
{
var request = new RestRequest($"/projections/continuous?name={name}&enabled=yes&emit=true", Method.POST) { RequestFormat = DataFormat.Json }
.AddBody(javascript);
NewRestClient(appSettings).ExecuteAsync(request, (r, h) =>
{
if (r.StatusCode != HttpStatusCode.Created)
{
Log.Error($"Failed to create projection with name {name} and body {javascript}. Result {r.StatusCode} {r.ErrorMessage}");
}
responseHandler(r);
});
}
/// <summary>
/// By enabling the by category projections all streams can be browsed using a $ce-{streamPrefix} projection
/// </summary>
public static void EnableByCategoryProjections(this IEventStoreConnection connection, IOptions<AppSettings> appSettings)
{
// POST http://localhost:2114/projection/$by_category/command/enable
var request = new RestRequest("/projection/$by_category/command/enable", Method.POST);
request.AddHeader("Content-Length", "0");
NewRestClient(appSettings).ExecuteAsync(request, response =>
{
if (response.StatusCode != HttpStatusCode.OK)
{
Log.Error($"Failed to enable Event Store $by_category projection. Category streams such as $ce-oandaImport are not available! Result {response.StatusCode} {response.ErrorMessage}");
}
});
}
public static async Task EnsureStreamMetaDataMaxAgeAsync(this IEventStoreConnection connection, string streamName, TimeSpan? maxAge)
{
Task<StreamMetadataResult> task = connection.GetStreamMetadataAsync(streamName);
if (await Task.WhenAny(task, Task.Delay(2500)) == task)
{
// task completed within timeout
if (task.Result.StreamMetadata.MaxAge != maxAge)
{
Log.Logger.Information($"Setting MaxAge of EventStore stream {streamName} to {maxAge?.ToString() ?? "NULL"}.");
await connection.SetStreamMetadataAsync(streamName, task.Result.MetastreamVersion,
StreamMetadata.Create(null, maxAge));
}
}
else
{
// timeout logic
Log.Warning("EventStore GetStreamMetadataAsync did not complete within the timeout. Cannot set MaxAge of Eventstore stream {streamName}");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment