Created
February 24, 2016 11:53
-
-
Save kiyoaki/f65ff7ec39838b3f7ac0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Google.Apis.Util; | |
namespace SampleApp.BigQuery.Actions | |
{ | |
public static class BigQueryStreaming | |
{ | |
private const int RetryCount = 3; | |
private const double RetryDeltaBackOffMilliseconds = 200.0; | |
private static readonly int BulkCount = BigQuerySettings.BigQueryBulkCount; | |
private static readonly ConcurrentDictionary<Tuple<Type, Type>, ConcurrentQueue<object>> InsertTargets = | |
new ConcurrentDictionary<Tuple<Type, Type>, ConcurrentQueue<object>>(); | |
public static void Enqueue<TDataset, TTable>(TTable row) | |
where TDataset : IBigQueryDataset | |
where TTable : IBigQueryTable | |
{ | |
Enqueue<TDataset, TTable>(new[] { row }); | |
} | |
public static void Enqueue<TDataset, TTable>(TTable[] rows) | |
where TDataset : IBigQueryDataset | |
where TTable : IBigQueryTable | |
{ | |
try | |
{ | |
var targets = GetOrAddQueue<TDataset, TTable>(); | |
foreach (var row in rows) | |
{ | |
targets.Enqueue(row); | |
} | |
if (targets.Count >= BulkCount) | |
{ | |
FlushAsync<TDataset, TTable>(targets) | |
.FireAndForget(additionalMessage: "BigQuery Insertエラー (他の処理に影響はありません)"); | |
} | |
} | |
catch (Exception ex) | |
{ | |
//ロギング | |
} | |
} | |
public static void FlushAll() | |
{ | |
Logger.Info("BigQueryStreaming FlushAll"); | |
InsertTargets | |
.Where(x => x.Value != null && x.Value.Any()) | |
.Select(x => FlushAsync(x.Value, x.Key.Item1, x.Key.Item2)) | |
.WhenAll() | |
.Wait(); | |
} | |
private static async Task FlushAsync<TDataset, TTable>(ConcurrentQueue<object> targets) | |
where TDataset : IBigQueryDataset | |
where TTable : IBigQueryTable | |
{ | |
await FlushAsync(targets, typeof(TDataset), typeof(TTable)); | |
} | |
private static async Task FlushAsync(ConcurrentQueue<object> targets, Type datasetType, Type tableType) | |
{ | |
var list = new List<object>(); | |
object data; | |
while (targets.TryDequeue(out data)) | |
{ | |
list.Add(data); | |
} | |
var bigQueryContext = BigQueryAccount.GetContext(); | |
var tableName = BigQueryTableSettings.HasDateSuffix(tableType) | |
? tableType.Name + DateTime.UtcNow.ToString("yyyyMM") | |
: tableType.Name; | |
var table = bigQueryContext.CreateMetaTable(datasetType.Name, tableName); | |
var fields = BigQueryTableSchemaFactory.Create(tableType); | |
try | |
{ | |
//テーブルが無かったら作成 | |
table.CreateTable(bigQueryContext.BigQueryService, fields); | |
} | |
catch | |
{ | |
// ignored | |
} | |
var backOff = new ExponentialBackOff(TimeSpan.FromMilliseconds(RetryDeltaBackOffMilliseconds), RetryCount); | |
await table.InsertAllAsync(bigQueryContext.BigQueryService, list, backOff); | |
} | |
private static ConcurrentQueue<object> GetOrAddQueue<TDataset, TTable>() | |
where TDataset : IBigQueryDataset | |
where TTable : IBigQueryTable | |
{ | |
return InsertTargets.GetOrAdd(Tuple.Create(typeof(TDataset), typeof(TTable)), | |
tuple => new ConcurrentQueue<object>()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment