Last active
August 26, 2022 15:28
-
-
Save bogdangaliceanu/d3e8b96c8aab3b4e1bb299b52daf788e to your computer and use it in GitHub Desktop.
Amazon Neptune autoscaling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public sealed class DynamicNeptuneGraphDb : IDisposable | |
{ | |
private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); | |
private readonly NeptuneConfiguration configuration; | |
private readonly IGremlinStatusProvider gremlinStatusProvider; | |
private readonly IGremlinClientFactory clientFactory; | |
private readonly INeptuneRefreshScheduler refreshScheduler; | |
private readonly INeptuneInstanceDescriber neptuneInstanceDescriber; | |
private readonly ManualResetEventSlim refreshLock = new(false, 0); | |
// the client that connects to the only writer instance in the cluster | |
private (string InstanceId, IGremlinClient Client) writeClient; | |
// the clients whose instances are ready to process queries | |
private readonly List<(DBInstance Instance, IGremlinClient Client)> availableReadClients = new(); | |
// the clients whose instances are no longer able to accept new queries (because their status is no longer 'available') but still exist and are still processing queries | |
private readonly List<(DBInstance Instance, IGremlinClient Client)> unavailableReadClients = new(); | |
// for each type of query, the load situation of each instance | |
private ConcurrentDictionary<string, ImmutableArray<InstanceLoadInfo>> instanceLoadInfoByQueryDescriptor; | |
private const string queryDescriptorPrefix = "//queryDescriptor:"; | |
public DynamicNeptuneGraphDb(IOptions<NeptuneConfiguration> neptuneConfiguration, IGremlinStatusProvider gremlinStatusProvider, | |
IGremlinClientFactory clientFactory, INeptuneRefreshScheduler refreshScheduler, INeptuneInstanceDescriber neptuneInstanceDescriber) | |
{ | |
this.gremlinStatusProvider = gremlinStatusProvider; | |
this.clientFactory = clientFactory; | |
this.refreshScheduler = refreshScheduler; | |
this.neptuneInstanceDescriber = neptuneInstanceDescriber; | |
this.configuration = neptuneConfiguration.Value; | |
this.refreshScheduler.Elapsed += async (_, _) => | |
{ | |
try | |
{ | |
await Refresh(); | |
} | |
catch (Exception e) | |
{ | |
Logger.Error(e); | |
} | |
}; | |
} | |
public async Task Init() | |
{ | |
await Refresh(); | |
refreshScheduler.Start(); | |
} | |
public void Dispose() | |
{ | |
refreshScheduler.Dispose(); | |
neptuneInstanceDescriber.Dispose(); | |
writeClient.Client?.Dispose(); | |
availableReadClients.ForEach(c => c.Client.Dispose()); | |
refreshLock.Dispose(); | |
gremlinStatusProvider.Dispose(); | |
} | |
public GraphTraversalSource TraversalSource | |
{ | |
get | |
{ | |
refreshLock.Wait(); | |
var connection = new DriverRemoteConnection(writeClient.Client); | |
return Traversal().WithRemote(connection); | |
} | |
} | |
public async Task<ResultSet<ResponseType>> ReadAsync<ResponseType>(string queryDescriptor, string requestScript) | |
{ | |
var readClient = GetReadClient(queryDescriptor); | |
requestScript = $"{queryDescriptorPrefix}{queryDescriptor}\n{requestScript}"; | |
try | |
{ | |
return await readClient.SubmitAsync<ResponseType>(requestScript); | |
} | |
catch (Exception e) when (e is WebSocketException or ObjectDisposedException or ServerUnavailableException) | |
// WebSocketException and ServerUnavailableException if the instance becomes unavailable between client refreshes | |
// ObjectDisposedException if a client refresh occurs between obtaining the client and submitting the query, the instance is unavailable and thus the client is disposed | |
{ | |
await Refresh(); | |
readClient = GetReadClient(queryDescriptor); | |
return await readClient.SubmitAsync<ResponseType>(requestScript); | |
} | |
} | |
private IGremlinClient GetReadClient(string queryDescriptor) | |
{ | |
refreshLock.Wait(); | |
if (availableReadClients.IsEmpty()) | |
{ | |
return writeClient.Client; | |
} | |
var loadInfo = instanceLoadInfoByQueryDescriptor.GetOrAdd( | |
queryDescriptor, | |
_ => availableReadClients | |
.Select((_, i) => new InstanceLoadInfo {InstanceIndex = i, CurrentLoad = 0}) | |
.ToImmutableArray() | |
); | |
var leastUtilizedInstanceInfo = loadInfo.MinBy(x => x.CurrentLoad); | |
Interlocked.Increment(ref leastUtilizedInstanceInfo!.CurrentLoad); | |
return availableReadClients[leastUtilizedInstanceInfo.InstanceIndex].Client; | |
} | |
private async Task Refresh() | |
{ | |
refreshLock.Reset(); | |
try | |
{ | |
var statuses = await RefreshClients(); | |
RefreshLoad(statuses); | |
} | |
finally | |
{ | |
refreshLock.Set(); | |
} | |
} | |
private async Task<ImmutableArray<GremlinStatus>> RefreshClients() | |
{ | |
var (writer, readers) = await neptuneInstanceDescriber.GetInstances(configuration.Cluster); | |
var instancesDiff = DetermineInstancesDiff(writer, readers); | |
if (instancesDiff.ChangedWriterInstance != null) | |
{ | |
writeClient.Client?.Dispose(); | |
writeClient = ( | |
instancesDiff.ChangedWriterInstance.DBInstanceIdentifier, | |
clientFactory.CreateClient(instancesDiff.ChangedWriterInstance.Endpoint.Address, instancesDiff.ChangedWriterInstance.Endpoint.Port, | |
configuration.PoolSize, configuration.MaxInProcessPerConnection) | |
); | |
} | |
availableReadClients.RemoveAll(c => | |
{ | |
if (instancesDiff.NewlyUnavailableReaderInstances.Any(i => i.DBInstanceIdentifier == c.Instance.DBInstanceIdentifier)) | |
{ | |
unavailableReadClients.Add(c); | |
return true; | |
} | |
return false; | |
}); | |
unavailableReadClients.RemoveAll(c => | |
{ | |
if (instancesDiff.ObsoleteUnavailableInstanceIds.Contains(c.Instance.DBInstanceIdentifier)) | |
{ | |
c.Client.Dispose(); | |
return true; | |
} | |
return false; | |
}); | |
availableReadClients.AddRange(instancesDiff.NewlyAvailableReaderInstances.Select(i => ( | |
i, | |
clientFactory.CreateClient(i.Endpoint.Address, i.Endpoint.Port, configuration.PoolSize, configuration.MaxInProcessPerConnection) | |
))); | |
var statuses = await GetAvailableReadersStatuses(); | |
var (reachable, unreachable) = statuses.Partition(s => s.Status != null); | |
availableReadClients.RemoveAll(c => | |
{ | |
if (unreachable.Any(s => s.InstanceId == c.Instance.DBInstanceIdentifier)) | |
{ | |
c.Client.Dispose(); | |
return true; | |
} | |
return false; | |
}); | |
return reachable.Select(s => s.Status).ToImmutableArray(); | |
} | |
private void RefreshLoad(ImmutableArray<GremlinStatus> statuses) | |
{ | |
var allQueryDescriptors = statuses | |
.SelectMany(s => s.Queries) | |
.Select(q => ExtractQueryDescriptor(q.QueryString)) | |
.Where(qd => !string.IsNullOrEmpty(qd)) | |
.Distinct() | |
.ToImmutableArray(); | |
instanceLoadInfoByQueryDescriptor = new ConcurrentDictionary<string, ImmutableArray<InstanceLoadInfo>>( | |
allQueryDescriptors | |
.SelectMany(qd => statuses | |
.Select((status, i) => | |
{ | |
var queriesWithDescriptorCount = status.Queries.Count(q => ExtractQueryDescriptor(q.QueryString) == qd); | |
return (Descriptor: qd, LoadInfo: new InstanceLoadInfo { CurrentLoad = queriesWithDescriptorCount, InstanceIndex = i }); | |
}) | |
) | |
.GroupBy(x => x.Descriptor) | |
.Select(g => KeyValuePair.Create(g.Key, g.Select(x => x.LoadInfo).ToImmutableArray())) | |
); | |
} | |
private async Task<(string InstanceId, GremlinStatus Status)[]> GetAvailableReadersStatuses() | |
{ | |
var statusRetrievals = availableReadClients | |
.Select(async c => | |
{ | |
try | |
{ | |
return (c.Instance.DBInstanceIdentifier, await gremlinStatusProvider.Get(c.Instance.Endpoint.Address, c.Instance.Endpoint.Port)); | |
} | |
catch (Exception e) | |
{ | |
Logger.Error(e); | |
return (c.Instance.DBInstanceIdentifier, null); | |
} | |
}); | |
return await Task.WhenAll(statusRetrievals); | |
} | |
private static string ExtractQueryDescriptor(string query) | |
{ | |
if (query is null || !query.StartsWith(queryDescriptorPrefix, StringComparison.Ordinal)) | |
{ | |
return string.Empty; | |
} | |
return query.Substring(queryDescriptorPrefix.Length, query.IndexOf('\n') - queryDescriptorPrefix.Length); | |
} | |
private InstancesDiff DetermineInstancesDiff(DBInstance currentWriteInstance, ImmutableArray<DBInstance> currentReadInstances) | |
{ | |
var newlyAvailableReaderInstances = currentReadInstances | |
.Where(i => | |
{ | |
if (i.DBInstanceStatus != "available") | |
{ | |
return false; | |
} | |
var isNotAlreadyKnown = availableReadClients.All(c => c.Instance.DBInstanceIdentifier != i.DBInstanceIdentifier); | |
return isNotAlreadyKnown; | |
}) | |
.ToImmutableArray(); | |
var newlyUnavailableReaderInstances = currentReadInstances | |
.Where(i => | |
{ | |
if (i.DBInstanceStatus == "available") | |
{ | |
return false; | |
} | |
var usedToBeAvailable = availableReadClients.Any(c => c.Instance.DBInstanceIdentifier == i.DBInstanceIdentifier); | |
var isNotAlreadyKnown = unavailableReadClients.All(c => c.Instance.DBInstanceIdentifier != i.DBInstanceIdentifier); | |
return usedToBeAvailable && isNotAlreadyKnown; | |
}) | |
.ToImmutableArray(); | |
var obsoleteUnavailableReaderInstanceIds = unavailableReadClients | |
.Select(c => c.Instance.DBInstanceIdentifier) | |
.Where(id => | |
{ | |
var doesNotExistAnymore = currentReadInstances.All(i => i.DBInstanceIdentifier != id); | |
var becameAvailableAgain = newlyAvailableReaderInstances.Any(i => i.DBInstanceIdentifier == id); | |
return doesNotExistAnymore || becameAvailableAgain; | |
}) | |
.ToImmutableArray(); | |
var changedWriterInstance = currentWriteInstance.DBInstanceIdentifier == writeClient.InstanceId | |
? null | |
: currentWriteInstance; | |
return new InstancesDiff | |
{ | |
ChangedWriterInstance = changedWriterInstance, | |
NewlyAvailableReaderInstances = newlyAvailableReaderInstances, | |
NewlyUnavailableReaderInstances = newlyUnavailableReaderInstances, | |
ObsoleteUnavailableInstanceIds = obsoleteUnavailableReaderInstanceIds, | |
}; | |
} | |
private class InstancesDiff | |
{ | |
public DBInstance ChangedWriterInstance { get; init; } | |
public ImmutableArray<DBInstance> NewlyAvailableReaderInstances { get; init; } | |
public ImmutableArray<DBInstance> NewlyUnavailableReaderInstances { get; init; } | |
public ImmutableArray<string> ObsoleteUnavailableInstanceIds { get; init; } | |
} | |
private class InstanceLoadInfo | |
{ | |
public int InstanceIndex; | |
public int CurrentLoad; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment