Skip to content

Instantly share code, notes, and snippets.

@Eibwen
Created June 24, 2020 22:04
Show Gist options
  • Save Eibwen/40a57625bd38c13fd1bdcf8f4068401b to your computer and use it in GitHub Desktop.
Save Eibwen/40a57625bd38c13fd1bdcf8f4068401b to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace ExcessivelySimple.Loggers
{
public interface IInfluxBatcher : IDisposable
{
void AppendLine(string database, string renderedPointLine);
//Task FlushAsync();
}
public interface IInfluxBuilder
{
/// <summary>
/// Tags only allow for strings. And should have low cardinality
/// </summary>
IInfluxBuilder WithTag(string key, string value);
IInfluxBuilder WithField(string key, float value);
/// <summary>
/// Store Integer/Long values
///
/// Note: the Influx docs warn: "Values close to but within those limits may lead to unexpected results; some functions and operators convert the int64 data type to float64 during calculation which can cause overflow issues."
/// </summary>
/// <remarks>
/// Based on this: https://docs.influxdata.com/influxdb/v1.7/troubleshooting/frequently-asked-questions/#what-are-the-minimum-and-maximum-integers-that-influxdb-can-store
/// Integers are stored as int64, so it should be safe to use <see cref="long"/> for this input.
/// </remarks>
IInfluxBuilder WithField(string key, long value);
IInfluxBuilder WithField(string key, string value);
IInfluxBuilder WithField(string key, bool value);
IInfluxBuilder SpecifyTime(DateTimeOffset date);
void WriteWithBuffer();
}
public class BasicInfluxdbLogger : IInfluxBatcher
{
private readonly Uri _influxHost;
private readonly TimeSpan _batchFlushTime;
private readonly HttpClient _httpClient;
private Task _batcher;
private List<(string database, string point)> _batchList = new List<(string, string)>();
private bool _inGoodState = true;
public BasicInfluxdbLogger(Uri influxHost, TimeSpan? batchFlushTime = null, HttpClient httpClient = null)
{
_influxHost = influxHost;
_batchFlushTime = batchFlushTime ?? TimeSpan.FromSeconds(60);
_httpClient = httpClient ?? new HttpClient();
_batcher = BatchLoop();
}
private async Task BatchLoop()
{
try
{
await Task.Delay(_batchFlushTime);
await FlushAsync();
}
catch (Exception e)
{
Console.WriteLine(e);
_inGoodState = false;
throw;
}
_batcher = BatchLoop();
}
/// <summary>
/// Begin the process for constructing and writing to influxdb
/// </summary>
/// <param name="database">The collection you're writing to</param>
/// <param name="measurement">Think of this as a TABLE in SQL</param>
/// <returns></returns>
public IInfluxBuilder Influx(string database, string measurement)
{
if (!_inGoodState)
{
throw new Exception("Influx logging is in an invalid state");
}
return new InfluxBuilder(this, database, measurement);
}
public void AppendLine(string database, string renderedPointString)
{
_batchList.Add((database, renderedPointString));
}
protected async Task FlushAsync()
{
var listToSend = _batchList;
_batchList = new List<(string, string)>();
var databases = listToSend.GroupBy(x => x.database);
var builder = new UriBuilder(_influxHost);
builder.Path += "write";
foreach (var databaseSet in databases)
{
var endpointData = new Dictionary<string, string>
{
{"db", databaseSet.Key}
};
builder.Query = await new FormUrlEncodedContent(endpointData).ReadAsStringAsync();
var req = new HttpRequestMessage(HttpMethod.Post, builder.Uri);
req.Content = new StringContent(string.Join("\n", databaseSet.Select(x => x.point)));
//TODO do these in parallel and use Task.WaitAll?
var res = await _httpClient.SendAsync(req);
if (!res.IsSuccessStatusCode)
{
var resBody = await res.Content.ReadAsStringAsync();
throw new HttpRequestException($"StatusCode: {(int)res.StatusCode} ({res.StatusCode})\nBody: '{resBody}'");
}
}
}
internal class InfluxBuilder : IInfluxBuilder
{
private readonly IInfluxBatcher _influxBatcher;
private readonly string _database;
private readonly string _measurement;
private readonly Dictionary<string, string> _tags = new Dictionary<string, string>();
private readonly Dictionary<string, string> _fields = new Dictionary<string, string>();
private DateTimeOffset? _date;
public InfluxBuilder(IInfluxBatcher influxBatcher, string database, string measurement)
{
_influxBatcher = influxBatcher;
_database = database;
_measurement = measurement;
}
/// <inheritdoc />
public IInfluxBuilder WithTag(string key, string value)
{
_tags.Add(key, value);
return this;
}
public IInfluxBuilder WithField(string key, float value)
{
_fields.Add(key, value.ToString());
return this;
}
public IInfluxBuilder WithField(string key, long value)
{
_fields.Add(key, $"{value}i");
return this;
}
public IInfluxBuilder WithField(string key, string value)
{
_fields.Add(key, $"\"{EscapeFieldStringValue(value)}\"");
return this;
}
public IInfluxBuilder WithField(string key, bool value)
{
_fields.Add(key, value.ToString());
return this;
}
public IInfluxBuilder SpecifyTime(DateTimeOffset date)
{
_date = date;
return this;
}
public void WriteWithBuffer()
{
_influxBatcher.AppendLine(_database, BuildLineProtocol(true));
}
/// <summary>
/// Escapes for tag keys, tag values, and field keys
/// </summary>
private string EscapeNames(string input)
{
return input
.Replace(@"\", @"\\")
.Replace(",", @"\,")
.Replace("=", @"\=")
.Replace(" ", @"\ ");
}
private string EscapeMeasurement(string input)
{
return input
.Replace(@"\", @"\\")
.Replace(",", @"\,")
.Replace(" ", @"\ ");
}
private string EscapeFieldStringValue(string input)
{
return input
.Replace(@"\", @"\\")
.Replace("\"", @"\""");
}
/// <summary>
/// Builds the line protocol that InfluxDB uses
/// </summary>
/// <param name="forceDateTime">Useful if batching, ensure a time is set</param>
private string BuildLineProtocol(bool forceDateTime)
{
var sb = new StringBuilder();
sb.Append(EscapeMeasurement(_measurement));
//For best performance you should sort tags by key before sending them to the database. The sort should match the results from the Go bytes.Compare function.
foreach (var tag in _tags.OrderBy(x => x.Key))
{
sb.Append(",");
sb.Append($"{EscapeNames(tag.Key)}={EscapeNames(tag.Value)}");
}
if (!_fields.Any())
{
throw new Exception("Not sure if Influxdb strictly requires at least one Field, but I don't see the point in supporting that right now");
}
sb.Append(" ");
sb.Append(string.Join(",", _fields.Select(field => $"{EscapeNames(field.Key)}={field.Value}")));
//foreach (var field in _fields)
//{
// sb.Append($"{EscapeNames(field.Key)}={field.Value}");
//}
if (forceDateTime && !_date.HasValue)
{
_date = DateTimeOffset.UtcNow;
}
var dateInfo = _date.HasValue ? $" {_date.Value.ToUnixTimeMilliseconds()}000000" : "";
sb.Append(dateInfo);
//|measurement|,tag_set| |field_set| |timestamp|
return sb.ToString();
}
}
public void Dispose()
{
FlushAsync().Wait();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment