Skip to content

Instantly share code, notes, and snippets.

@kiyoaki
Created February 24, 2016 11:53
Show Gist options
  • Save kiyoaki/f65ff7ec39838b3f7ac0 to your computer and use it in GitHub Desktop.
Save kiyoaki/f65ff7ec39838b3f7ac0 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Google.Apis.Util;
namespace SampleApp.BigQuery.Actions
{
public static class BigQueryStreaming
{
private const int RetryCount = 3;
private const double RetryDeltaBackOffMilliseconds = 200.0;
private static readonly int BulkCount = BigQuerySettings.BigQueryBulkCount;
private static readonly ConcurrentDictionary<Tuple<Type, Type>, ConcurrentQueue<object>> InsertTargets =
new ConcurrentDictionary<Tuple<Type, Type>, ConcurrentQueue<object>>();
public static void Enqueue<TDataset, TTable>(TTable row)
where TDataset : IBigQueryDataset
where TTable : IBigQueryTable
{
Enqueue<TDataset, TTable>(new[] { row });
}
public static void Enqueue<TDataset, TTable>(TTable[] rows)
where TDataset : IBigQueryDataset
where TTable : IBigQueryTable
{
try
{
var targets = GetOrAddQueue<TDataset, TTable>();
foreach (var row in rows)
{
targets.Enqueue(row);
}
if (targets.Count >= BulkCount)
{
FlushAsync<TDataset, TTable>(targets)
.FireAndForget(additionalMessage: "BigQuery Insertエラー (他の処理に影響はありません)");
}
}
catch (Exception ex)
{
//ロギング
}
}
public static void FlushAll()
{
Logger.Info("BigQueryStreaming FlushAll");
InsertTargets
.Where(x => x.Value != null && x.Value.Any())
.Select(x => FlushAsync(x.Value, x.Key.Item1, x.Key.Item2))
.WhenAll()
.Wait();
}
private static async Task FlushAsync<TDataset, TTable>(ConcurrentQueue<object> targets)
where TDataset : IBigQueryDataset
where TTable : IBigQueryTable
{
await FlushAsync(targets, typeof(TDataset), typeof(TTable));
}
private static async Task FlushAsync(ConcurrentQueue<object> targets, Type datasetType, Type tableType)
{
var list = new List<object>();
object data;
while (targets.TryDequeue(out data))
{
list.Add(data);
}
var bigQueryContext = BigQueryAccount.GetContext();
var tableName = BigQueryTableSettings.HasDateSuffix(tableType)
? tableType.Name + DateTime.UtcNow.ToString("yyyyMM")
: tableType.Name;
var table = bigQueryContext.CreateMetaTable(datasetType.Name, tableName);
var fields = BigQueryTableSchemaFactory.Create(tableType);
try
{
//テーブルが無かったら作成
table.CreateTable(bigQueryContext.BigQueryService, fields);
}
catch
{
// ignored
}
var backOff = new ExponentialBackOff(TimeSpan.FromMilliseconds(RetryDeltaBackOffMilliseconds), RetryCount);
await table.InsertAllAsync(bigQueryContext.BigQueryService, list, backOff);
}
private static ConcurrentQueue<object> GetOrAddQueue<TDataset, TTable>()
where TDataset : IBigQueryDataset
where TTable : IBigQueryTable
{
return InsertTargets.GetOrAdd(Tuple.Create(typeof(TDataset), typeof(TTable)),
tuple => new ConcurrentQueue<object>());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment