Created
June 24, 2020 22:04
-
-
Save Eibwen/40a57625bd38c13fd1bdcf8f4068401b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Net.Http; | |
using System.Runtime.CompilerServices; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace ExcessivelySimple.Loggers | |
{ | |
public interface IInfluxBatcher : IDisposable | |
{ | |
void AppendLine(string database, string renderedPointLine); | |
//Task FlushAsync(); | |
} | |
public interface IInfluxBuilder | |
{ | |
/// <summary> | |
/// Tags only allow for strings. And should have low cardinality | |
/// </summary> | |
IInfluxBuilder WithTag(string key, string value); | |
IInfluxBuilder WithField(string key, float value); | |
/// <summary> | |
/// Store Integer/Long values | |
/// | |
/// Note: the Influx docs warn: "Values close to but within those limits may lead to unexpected results; some functions and operators convert the int64 data type to float64 during calculation which can cause overflow issues." | |
/// </summary> | |
/// <remarks> | |
/// Based on this: https://docs.influxdata.com/influxdb/v1.7/troubleshooting/frequently-asked-questions/#what-are-the-minimum-and-maximum-integers-that-influxdb-can-store | |
/// Integers are stored as int64, so it should be safe to use <see cref="long"/> for this input. | |
/// </remarks> | |
IInfluxBuilder WithField(string key, long value); | |
IInfluxBuilder WithField(string key, string value); | |
IInfluxBuilder WithField(string key, bool value); | |
IInfluxBuilder SpecifyTime(DateTimeOffset date); | |
void WriteWithBuffer(); | |
} | |
public class BasicInfluxdbLogger : IInfluxBatcher | |
{ | |
private readonly Uri _influxHost; | |
private readonly TimeSpan _batchFlushTime; | |
private readonly HttpClient _httpClient; | |
private Task _batcher; | |
private List<(string database, string point)> _batchList = new List<(string, string)>(); | |
private bool _inGoodState = true; | |
public BasicInfluxdbLogger(Uri influxHost, TimeSpan? batchFlushTime = null, HttpClient httpClient = null) | |
{ | |
_influxHost = influxHost; | |
_batchFlushTime = batchFlushTime ?? TimeSpan.FromSeconds(60); | |
_httpClient = httpClient ?? new HttpClient(); | |
_batcher = BatchLoop(); | |
} | |
private async Task BatchLoop() | |
{ | |
try | |
{ | |
await Task.Delay(_batchFlushTime); | |
await FlushAsync(); | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine(e); | |
_inGoodState = false; | |
throw; | |
} | |
_batcher = BatchLoop(); | |
} | |
/// <summary> | |
/// Begin the process for constructing and writing to influxdb | |
/// </summary> | |
/// <param name="database">The collection you're writing to</param> | |
/// <param name="measurement">Think of this as a TABLE in SQL</param> | |
/// <returns></returns> | |
public IInfluxBuilder Influx(string database, string measurement) | |
{ | |
if (!_inGoodState) | |
{ | |
throw new Exception("Influx logging is in an invalid state"); | |
} | |
return new InfluxBuilder(this, database, measurement); | |
} | |
public void AppendLine(string database, string renderedPointString) | |
{ | |
_batchList.Add((database, renderedPointString)); | |
} | |
protected async Task FlushAsync() | |
{ | |
var listToSend = _batchList; | |
_batchList = new List<(string, string)>(); | |
var databases = listToSend.GroupBy(x => x.database); | |
var builder = new UriBuilder(_influxHost); | |
builder.Path += "write"; | |
foreach (var databaseSet in databases) | |
{ | |
var endpointData = new Dictionary<string, string> | |
{ | |
{"db", databaseSet.Key} | |
}; | |
builder.Query = await new FormUrlEncodedContent(endpointData).ReadAsStringAsync(); | |
var req = new HttpRequestMessage(HttpMethod.Post, builder.Uri); | |
req.Content = new StringContent(string.Join("\n", databaseSet.Select(x => x.point))); | |
//TODO do these in parallel and use Task.WaitAll? | |
var res = await _httpClient.SendAsync(req); | |
if (!res.IsSuccessStatusCode) | |
{ | |
var resBody = await res.Content.ReadAsStringAsync(); | |
throw new HttpRequestException($"StatusCode: {(int)res.StatusCode} ({res.StatusCode})\nBody: '{resBody}'"); | |
} | |
} | |
} | |
internal class InfluxBuilder : IInfluxBuilder | |
{ | |
private readonly IInfluxBatcher _influxBatcher; | |
private readonly string _database; | |
private readonly string _measurement; | |
private readonly Dictionary<string, string> _tags = new Dictionary<string, string>(); | |
private readonly Dictionary<string, string> _fields = new Dictionary<string, string>(); | |
private DateTimeOffset? _date; | |
public InfluxBuilder(IInfluxBatcher influxBatcher, string database, string measurement) | |
{ | |
_influxBatcher = influxBatcher; | |
_database = database; | |
_measurement = measurement; | |
} | |
/// <inheritdoc /> | |
public IInfluxBuilder WithTag(string key, string value) | |
{ | |
_tags.Add(key, value); | |
return this; | |
} | |
public IInfluxBuilder WithField(string key, float value) | |
{ | |
_fields.Add(key, value.ToString()); | |
return this; | |
} | |
public IInfluxBuilder WithField(string key, long value) | |
{ | |
_fields.Add(key, $"{value}i"); | |
return this; | |
} | |
public IInfluxBuilder WithField(string key, string value) | |
{ | |
_fields.Add(key, $"\"{EscapeFieldStringValue(value)}\""); | |
return this; | |
} | |
public IInfluxBuilder WithField(string key, bool value) | |
{ | |
_fields.Add(key, value.ToString()); | |
return this; | |
} | |
public IInfluxBuilder SpecifyTime(DateTimeOffset date) | |
{ | |
_date = date; | |
return this; | |
} | |
public void WriteWithBuffer() | |
{ | |
_influxBatcher.AppendLine(_database, BuildLineProtocol(true)); | |
} | |
/// <summary> | |
/// Escapes for tag keys, tag values, and field keys | |
/// </summary> | |
private string EscapeNames(string input) | |
{ | |
return input | |
.Replace(@"\", @"\\") | |
.Replace(",", @"\,") | |
.Replace("=", @"\=") | |
.Replace(" ", @"\ "); | |
} | |
private string EscapeMeasurement(string input) | |
{ | |
return input | |
.Replace(@"\", @"\\") | |
.Replace(",", @"\,") | |
.Replace(" ", @"\ "); | |
} | |
private string EscapeFieldStringValue(string input) | |
{ | |
return input | |
.Replace(@"\", @"\\") | |
.Replace("\"", @"\"""); | |
} | |
/// <summary> | |
/// Builds the line protocol that InfluxDB uses | |
/// </summary> | |
/// <param name="forceDateTime">Useful if batching, ensure a time is set</param> | |
private string BuildLineProtocol(bool forceDateTime) | |
{ | |
var sb = new StringBuilder(); | |
sb.Append(EscapeMeasurement(_measurement)); | |
//For best performance you should sort tags by key before sending them to the database. The sort should match the results from the Go bytes.Compare function. | |
foreach (var tag in _tags.OrderBy(x => x.Key)) | |
{ | |
sb.Append(","); | |
sb.Append($"{EscapeNames(tag.Key)}={EscapeNames(tag.Value)}"); | |
} | |
if (!_fields.Any()) | |
{ | |
throw new Exception("Not sure if Influxdb strictly requires at least one Field, but I don't see the point in supporting that right now"); | |
} | |
sb.Append(" "); | |
sb.Append(string.Join(",", _fields.Select(field => $"{EscapeNames(field.Key)}={field.Value}"))); | |
//foreach (var field in _fields) | |
//{ | |
// sb.Append($"{EscapeNames(field.Key)}={field.Value}"); | |
//} | |
if (forceDateTime && !_date.HasValue) | |
{ | |
_date = DateTimeOffset.UtcNow; | |
} | |
var dateInfo = _date.HasValue ? $" {_date.Value.ToUnixTimeMilliseconds()}000000" : ""; | |
sb.Append(dateInfo); | |
//|measurement|,tag_set| |field_set| |timestamp| | |
return sb.ToString(); | |
} | |
} | |
public void Dispose() | |
{ | |
FlushAsync().Wait(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment