Created
December 22, 2016 21:16
-
-
Save ReubenBond/bd70a626dc88921f399451b2e720626c to your computer and use it in GitHub Desktop.
PropertyManager-based Cluster Membership provider for Service Fabric integration in Microsoft Orleans
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Microsoft.Orleans.ServiceFabric | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Fabric; | |
using System.Globalization; | |
using System.Linq; | |
using System.Net; | |
using System.Threading.Tasks; | |
using global::Orleans; | |
using global::Orleans.Messaging; | |
using global::Orleans.Runtime; | |
using global::Orleans.Runtime.Configuration; | |
using global::Orleans.Serialization; | |
using Newtonsoft.Json; | |
/// <summary> | |
/// Cluster membership provider which uses Service Fabric's Property Manager as a consistent store. | |
/// </summary> | |
public class ServiceFabricPropertyManagerMembershipProvider : IMembershipTable, IGatewayListProvider | |
{ | |
private const string VersionPropertyName = "VERSION"; | |
private const string ETagPropertyName = "ETAG"; | |
private const string DefaultETag = "0"; | |
private readonly FabricClient fabricClient; | |
private readonly JsonSerializerSettings serializerSettings; | |
private Logger log; | |
private FabricClient.PropertyManagementClient store; | |
private Uri tableUri; | |
/// <summary> | |
/// Initializes a new <see name="ServiceFabricNamingServiceGatewayProvider"/> instance. | |
/// </summary> | |
/// <param name="client">The Service Fabric client to use.</param> | |
public ServiceFabricPropertyManagerMembershipProvider(FabricClient client) | |
{ | |
this.fabricClient = client; | |
this.serializerSettings = OrleansJsonSerializer.GetDefaultSerializerSettings(); | |
} | |
/// <summary> | |
/// Initializes the provider, will be called before all other methods | |
/// </summary> | |
/// <param name="config">The given client configuration</param> | |
/// <param name="logger">The logger to be used by the provider</param> | |
public Task InitializeGatewayListProvider(ClientConfiguration config, Logger logger) | |
{ | |
this.Initialize(logger, config.DeploymentId); | |
this.MaxStaleness = config.GatewayListRefreshPeriod; | |
return Task.FromResult(0); | |
} | |
/// <summary> | |
/// Initializes the membership table, will be called before all other methods | |
/// </summary> | |
/// <param name="config">the give global configuration</param> | |
/// <param name="tryInitTableVersion">whether an attempt will be made to init the underlying table</param> | |
/// <param name="logger">the logger used by the membership table</param> | |
public async Task InitializeMembershipTable(GlobalConfiguration config, bool tryInitTableVersion, Logger logger) | |
{ | |
this.Initialize(logger, config.DeploymentId); | |
this.log = logger; | |
if (tryInitTableVersion) | |
{ | |
try | |
{ | |
await this.store.CreateNameAsync(this.tableUri); | |
} | |
catch (FabricElementAlreadyExistsException) | |
{ | |
this.log?.Verbose($"Membership table already exists in property store at {this.tableUri}"); | |
} | |
var ops = new PropertyBatchOperation[] | |
{ | |
// Check preconditions. | |
new CheckExistsPropertyOperation(ETagPropertyName, false), | |
// Update version and insert rows. | |
new PutPropertyOperation(VersionPropertyName, 0), | |
new PutPropertyOperation(ETagPropertyName, DefaultETag) | |
}; | |
await this.store.SubmitPropertyBatchAsync(this.tableUri, ops); | |
} | |
} | |
/// <summary> | |
/// Atomically reads the Membership Table information about a given silo. | |
/// The returned MembershipTableData includes one MembershipEntry entry for a given silo and the | |
/// TableVersion for this table. The MembershipEntry and the TableVersion have to be read atomically. | |
/// </summary> | |
/// <param name="siloAddress">The address of the silo whose membership information needs to be read.</param> | |
/// <returns>The membership information for a given silo: MembershipTableData consisting one MembershipEntry entry and | |
/// TableVersion, read atomically.</returns> | |
public Task<MembershipTableData> ReadRow(SiloAddress siloAddress) | |
{ | |
return this.ReadEntries(siloAddress); | |
} | |
/// <summary> | |
/// Atomically reads the full content of the Membership Table. | |
/// The returned MembershipTableData includes all MembershipEntry entry for all silos in the table and the | |
/// TableVersion for this table. The MembershipEntries and the TableVersion have to be read atomically. | |
/// </summary> | |
/// <returns>The membership information for a given table: MembershipTableData consisting multiple MembershipEntry entries and | |
/// TableVersion, all read atomically.</returns> | |
public Task<MembershipTableData> ReadAll() | |
{ | |
return this.ReadEntries(); | |
} | |
/// <summary> | |
/// Atomically tries to insert (add) a new MembershipEntry for one silo and also update the TableVersion. | |
/// If operation succeeds, the following changes would be made to the table: | |
/// 1) New MembershipEntry will be added to the table. | |
/// 2) The newly added MembershipEntry will also be added with the new unique automatically generated eTag. | |
/// 3) TableVersion.Version in the table will be updated to the new TableVersion.Version. | |
/// 4) TableVersion etag in the table will be updated to the new unique automatically generated eTag. | |
/// All those changes to the table, insert of a new row and update of the table version and the associated etags, should happen atomically, or fail atomically with no side effects. | |
/// The operation should fail in each of the following conditions: | |
/// 1) A MembershipEntry for a given silo already exist in the table | |
/// 2) Update of the TableVersion failed since the given TableVersion etag (as specified by the TableVersion.VersionEtag property) did not match the TableVersion etag in the table. | |
/// </summary> | |
/// <param name="entry">MembershipEntry to be inserted.</param> | |
/// <param name="tableVersion">The new TableVersion for this table, along with its etag.</param> | |
/// <returns>True if the insert operation succeeded and false otherwise.</returns> | |
public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion) | |
{ | |
var rowNames = RowNames.Create(entry.SiloAddress); | |
var newETag = (tableVersion.Version + 1).ToString(CultureInfo.InvariantCulture); | |
var ops = new PropertyBatchOperation[] | |
{ | |
// Check preconditions. | |
new CheckValuePropertyOperation(ETagPropertyName, tableVersion.VersionEtag), | |
new CheckExistsPropertyOperation(rowNames.Entry, false), | |
// Update version and insert rows. | |
new PutPropertyOperation(VersionPropertyName, tableVersion.Version), | |
new PutPropertyOperation(ETagPropertyName, newETag), | |
new PutPropertyOperation(rowNames.Entry, JsonConvert.SerializeObject(entry, this.serializerSettings)), | |
new PutPropertyOperation(rowNames.Alive, entry.IAmAliveTime.Ticks), | |
new PutPropertyOperation(rowNames.ETag, newETag) | |
}; | |
var result = await this.store.SubmitPropertyBatchAsync(this.tableUri, ops); | |
// A value of -1 indicates that no operation failed. | |
return result.FailedOperationIndex == -1; | |
} | |
/// <summary> | |
/// Atomically tries to update the MembershipEntry for one silo and also update the TableVersion. | |
/// If operation succeeds, the following changes would be made to the table: | |
/// 1) The MembershipEntry for this silo will be updated to the new MembershipEntry (the old entry will be fully substitued by the new entry) | |
/// 2) The eTag for the updated MembershipEntry will also be eTag with the new unique automatically generated eTag. | |
/// 3) TableVersion.Version in the table will be updated to the new TableVersion.Version. | |
/// 4) TableVersion etag in the table will be updated to the new unique automatically generated eTag. | |
/// All those changes to the table, update of a new row and update of the table version and the associated etags, should happen atomically, or fail atomically with no side effects. | |
/// The operation should fail in each of the following conditions: | |
/// 1) A MembershipEntry for a given silo does not exist in the table | |
/// 2) A MembershipEntry for a given silo exist in the table but its etag in the table does not match the provided etag. | |
/// 3) Update of the TableVersion failed since the given TableVersion etag (as specified by the TableVersion.VersionEtag property) did not match the TableVersion etag in the table. | |
/// </summary> | |
/// <param name="entry">MembershipEntry to be updated.</param> | |
/// <param name="etag">The etag for the given MembershipEntry.</param> | |
/// <param name="tableVersion">The new TableVersion for this table, along with its etag.</param> | |
/// <returns>True if the update operation succeeded and false otherwise.</returns> | |
public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) | |
{ | |
var rowNames = RowNames.Create(entry.SiloAddress); | |
var newETag = (tableVersion.Version + 1).ToString(CultureInfo.InvariantCulture); | |
var ops = new PropertyBatchOperation[] | |
{ | |
// Check preconditions. | |
new CheckValuePropertyOperation(ETagPropertyName, tableVersion.VersionEtag), | |
new CheckValuePropertyOperation(rowNames.ETag, etag ?? DefaultETag), | |
new CheckExistsPropertyOperation(rowNames.Entry, true), | |
// Update version and insert rows. | |
new PutPropertyOperation(VersionPropertyName, tableVersion.Version), | |
new PutPropertyOperation(ETagPropertyName, newETag), | |
new PutPropertyOperation(rowNames.Entry, JsonConvert.SerializeObject(entry, this.serializerSettings)), | |
new PutPropertyOperation(rowNames.Alive, entry.IAmAliveTime.Ticks), | |
new PutPropertyOperation(rowNames.ETag, newETag) | |
}; | |
var result = await this.store.SubmitPropertyBatchAsync(this.tableUri, ops); | |
// A value of -1 indicates that no operation failed. | |
return result.FailedOperationIndex == -1; | |
} | |
/// <summary> | |
/// Updates the IAmAlive part (column) of the MembershipEntry for this silo. | |
/// This operation should only update the IAmAlive collumn and not change other columns. | |
/// This operation is a "dirty write" or "in place update" and is performed without etag validation. | |
/// With regards to eTags update: | |
/// This operation may automatically update the eTag associated with the given silo row, but it does not have to. It can also leave the etag not changed ("dirty write"). | |
/// With regards to TableVersion: | |
/// this operation should not change the TableVersion of the table. It should leave it untouched. | |
/// There is no scenario where this operation could fail due to table semantical reasons. It can only fail due to network problems or table unavailability. | |
/// </summary> | |
/// <param name="entry"></param> | |
/// <returns>Task representing the successful execution of this operation. </returns> | |
public Task UpdateIAmAlive(MembershipEntry entry) | |
{ | |
return this.store.PutPropertyAsync( | |
this.tableUri, | |
RowNames.GetAliveRowName(entry.SiloAddress), | |
entry.IAmAliveTime.Ticks); | |
} | |
/// <summary> | |
/// Returns the list of gateways (silos) that can be used by a client to connect to Orleans cluster. | |
/// The Uri is in the form of: "gwy.tcp://IP:port/Generation". See Utils.ToGatewayUri and Utils.ToSiloAddress for more details about Uri format. | |
/// </summary> | |
public async Task<IList<Uri>> GetGateways() | |
{ | |
var allSilos = await this.ReadAll(); | |
return | |
allSilos.Members.Select(e => e.Item1) | |
.Where(m => m.Status == SiloStatus.Active && m.ProxyPort != 0) | |
.Select(GetGatewayUri) | |
.ToList(); | |
} | |
/// <summary> | |
/// Specifies how often this IGatewayListProvider is refreshed, to have a bound on max staleness of its returned infomation. | |
/// </summary> | |
public TimeSpan MaxStaleness { get; private set; } | |
/// <summary> | |
/// Specifies whether this IGatewayListProvider ever refreshes its returned infomation, or always returns the same gw list. | |
/// (currently only the static config based StaticGatewayListProvider is not updatable. All others are.) | |
/// </summary> | |
public bool IsUpdatable => true; | |
/// <summary> | |
/// Deletes all table entries of the given deploymentId | |
/// </summary> | |
public Task DeleteMembershipTableEntries(string deploymentId) | |
{ | |
return this.store.DeleteNameAsync(GetTableUri(deploymentId)); | |
} | |
private void Initialize(Logger logger, string deploymentId) | |
{ | |
this.tableUri = GetTableUri(deploymentId); | |
this.log = logger; | |
this.store = this.fabricClient.PropertyManager; | |
} | |
private static Uri GetTableUri(string deploymentId) | |
{ | |
return new Uri("fabric:/silos_" + deploymentId); | |
} | |
private async Task<MembershipTableData> ReadEntries(SiloAddress siloAddress = null) | |
{ | |
MembershipTableData result; | |
do | |
{ | |
// Continue attempting to read from the table until a consistent read is made. | |
result = await this.TryReadEntries(siloAddress); | |
} | |
while (result == null); | |
return result; | |
} | |
/// <summary> | |
/// Attempts to read membership entries, returning <see langword="null"/> if the table was modified during the read. | |
/// </summary> | |
/// <param name="siloAddress">The silo to read, or <see langword="null"/> to read all silos.</param> | |
/// <returns> | |
/// The membership entries or <see langword="null"/> if the tables was modified during the read operation. | |
/// </returns> | |
private async Task<MembershipTableData> TryReadEntries(SiloAddress siloAddress = null) | |
{ | |
var suffix = siloAddress == null ? string.Empty : "_" + siloAddress.ToParsableString(); | |
var entries = new Dictionary<string, PropertyTableEntry>(); | |
var tableVersion = 0; | |
string tableETag = null; | |
PropertyEnumerationResult result = null; | |
do | |
{ | |
result = await this.store.EnumeratePropertiesAsync(this.tableUri, true, result); | |
if (!result.IsConsistent) | |
{ | |
// The table was modified while enumerating the properties. | |
return null; | |
} | |
foreach (var property in result) | |
{ | |
var name = property.Metadata.PropertyName; | |
if (string.Equals(VersionPropertyName, name)) | |
{ | |
tableVersion = (int)property.GetValue<long>(); | |
} | |
else if (string.Equals(ETagPropertyName, name)) | |
{ | |
tableETag = property.GetValue<string>(); | |
} | |
else if (name.StartsWith(".") & name.EndsWith(suffix)) | |
{ | |
var key = GetSiloAddress(name); | |
PropertyTableEntry entry; | |
if (!entries.TryGetValue(key, out entry)) | |
{ | |
entry = entries[key] = new PropertyTableEntry(); | |
} | |
if (name.StartsWith(RowNames.EntryPrefix)) | |
{ | |
entry.Entry = JsonConvert.DeserializeObject<MembershipEntry>( | |
property.GetValue<string>(), | |
this.serializerSettings); | |
} | |
else if (name.StartsWith(RowNames.AlivePrefix)) | |
{ | |
entry.LastIAmAliveTime = property.GetValue<long>(); | |
} | |
else if (name.StartsWith(RowNames.ETagPrefix)) | |
{ | |
entry.ETag = property.GetValue<string>(); | |
} | |
} | |
} | |
} | |
while (result.HasMoreData); | |
var results = new List<Tuple<MembershipEntry, string>>(entries.Count); | |
foreach (var entry in entries.Values) | |
{ | |
if (entry.Entry == null) continue; | |
entry.Entry.IAmAliveTime = new DateTime(entry.LastIAmAliveTime); | |
results.Add(Tuple.Create(entry.Entry, entry.ETag ?? DefaultETag)); | |
} | |
return new MembershipTableData(results, new TableVersion(tableVersion, tableETag)); | |
} | |
private static string GetSiloAddress(string name) => name.Substring(name.IndexOf('_') + 1); | |
private static Uri GetGatewayUri(MembershipEntry entry) | |
{ | |
var siloAddress = entry.SiloAddress; | |
var endpoint = new IPEndPoint(siloAddress.Endpoint.Address, entry.ProxyPort); | |
return SiloAddress.New(endpoint, siloAddress.Generation).ToGatewayUri(); | |
} | |
private struct RowNames | |
{ | |
public const string EntryPrefix = ".ENTRY_"; | |
public const string AlivePrefix = ".ALIVE_"; | |
public const string ETagPrefix = ".ETAG_"; | |
public static RowNames Create(SiloAddress siloAddress) | |
{ | |
var key = siloAddress.ToParsableString(); | |
return new RowNames | |
{ | |
Entry = EntryPrefix + key, | |
ETag = ETagPrefix + key, | |
Alive = AlivePrefix + key | |
}; | |
} | |
public static string GetAliveRowName(SiloAddress siloAddress) | |
=> AlivePrefix + siloAddress.ToParsableString(); | |
public string Entry { get; private set; } | |
public string ETag { get; private set; } | |
public string Alive { get; private set; } | |
} | |
private class PropertyTableEntry | |
{ | |
public MembershipEntry Entry { get; set; } | |
public string ETag { get; set; } | |
public long LastIAmAliveTime { get; set; } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment