Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tjrobinson/1139999 to your computer and use it in GitHub Desktop.
Save tjrobinson/1139999 to your computer and use it in GitHub Desktop.
RemoteStorageSyncProvider
using System;
using System.Collections.Generic;
using Microsoft.Synchronization;
using Microsoft.Synchronization.MetadataStorage;
using Microsoft.Synchronization.SimpleProviders;
namespace RemoteStorage
{
/// <summary>
/// Abstract class containing common functionality used by all FullEnumerationSimpleSyncProviders
/// </summary>
public abstract class CustomFullEnumerationSimpleSyncProvider : FullEnumerationSimpleSyncProvider, IDisposable
{
public readonly IClientLoggingService ClientLoggingService;
public CustomFullEnumerationSimpleSyncProvider(IClientLoggingService clientLoggingService)
{
this.ClientLoggingService = clientLoggingService;
}
public SqlMetadataStore MetadataStore { get; set; }
/// <summary>
/// Gets an <see cref="T:Microsoft.Synchronization.SimpleProviders.ItemMetadataSchema"/> object that represents the metadata schema for an item.
/// </summary>
/// <value></value>
/// <returns>An <see cref="T:Microsoft.Synchronization.SimpleProviders.ItemMetadataSchema"/> object that represents the metadata schema for
/// an item.</returns>
public override ItemMetadataSchema MetadataSchema
{
get
{
var customFields = new List<CustomFieldDefinition>
{
new CustomFieldDefinition((uint)ItemFields.CustomFieldName, typeof(string), RemoteStorageSyncProviderHelper.MaxFileNameLength),
new CustomFieldDefinition((uint)ItemFields.CustomFieldTimestamp, typeof(ulong)),
new CustomFieldDefinition((uint)ItemFields.CustomFieldSize, typeof(ulong))
};
var identityRule = new List<IdentityRule>
{
new IdentityRule(new uint[] { (uint)ItemFields.CustomFieldName })
};
return new ItemMetadataSchema(customFields, identityRule);
}
}
/// <summary>
/// Gets a <see cref="T:Microsoft.Synchronization.SyncIdFormatGroup"/> object that represents the format of replica and item IDs.
/// </summary>
/// <value></value>
/// <returns>A <see cref="T:Microsoft.Synchronization.SyncIdFormatGroup"/> object that is used to define the format of replica and item IDs.</returns>
public override SyncIdFormatGroup IdFormats
{
get
{
var idFormats = new SyncIdFormatGroup();
idFormats.ChangeUnitIdFormat.IsVariableLength = false;
idFormats.ChangeUnitIdFormat.Length = 4;
idFormats.ItemIdFormat.IsVariableLength = false;
idFormats.ItemIdFormat.Length = 24;
idFormats.ReplicaIdFormat.IsVariableLength = false;
idFormats.ReplicaIdFormat.Length = 16;
return idFormats;
}
}
/// <summary>
/// The provider can be versioned to support upgrades
/// </summary>
/// <value></value>
/// <returns>The version number for the simple provider.</returns>
public override short ProviderVersion
{
get { return 1; }
}
/// <summary>
/// Gets or sets the conflict resolution policy.
/// </summary>
/// <value>The conflict resolution policy.</value>
public ConflictResolutionPolicy ConflictResolutionPolicy
{
get
{
return this.Configuration.ConflictResolutionPolicy;
}
set
{
this.Configuration.ConflictResolutionPolicy = value;
}
}
/// <summary>
/// Gets or sets the collision conflict resolution policy.
/// </summary>
/// <value>The collision conflict resolution policy.</value>
public CollisionConflictResolutionPolicy CollisionConflictResolutionPolicy
{
get
{
return this.Configuration.CollisionConflictResolutionPolicy;
}
set
{
this.Configuration.CollisionConflictResolutionPolicy = value;
}
}
public static ItemFieldDictionary CreateKeyAndVersion(string relativePathAndFileName, DateTime timeStampUtc, long size)
{
var keyAndVersion = new ItemFieldDictionary()
{
new ItemField(ItemFields.CustomFieldName, typeof(string), relativePathAndFileName),
new ItemField(ItemFields.CustomFieldTimestamp, typeof(ulong), (ulong)timeStampUtc.ToFileTimeUtc()),
new ItemField(ItemFields.CustomFieldSize, typeof(ulong), (ulong)size)
};
return keyAndVersion;
}
/// <summary>
/// Called by the Sync Framework runtime at the end of each session unless a session is terminated because of an exception.
/// </summary>
public override void EndSession()
{
DisposeMetadataStore();
this.ClientLoggingService.LogInformation("Synchronisation session has ended");
}
public void DisposeMetadataStore()
{
if (this.MetadataStore != null)
{
this.MetadataStore.Dispose();
this.MetadataStore = null;
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool managed)
{
DisposeMetadataStore();
}
}
}
using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using Microsoft.Synchronization.Files;
namespace RemoteStorage
{
public class FileDataRetriever : IFileDataRetriever
{
[SuppressMessage("Microsoft.Design", "CA1065:DoNotRaiseExceptionsInUnexpectedLocations",
Justification = "When the Sync Framework detects a NotImplementedException on this getter,"
+ " it uses the FileStream property to access the file contents")]
public string AbsoluteSourceFilePath
{
get
{
// When the Sync Framework detects a NotImplementedException on this getter, it uses the FileStream property to access the file contents
// It is expected that exceptions will be thrown here
throw new NotImplementedException("AbsoluteSourceFilePath is not supported for this implementation of IFileDataRetriever");
}
}
public FileData FileData { get; set; }
public Stream FileStream { get; set; }
public string RelativeDirectoryPath { get; set; }
}
}
using System;
using Microsoft.Synchronization.SimpleProviders;
namespace RemoteStorage
{
public static class ItemFieldDictionaryExtensions
{
public static DateTime GetTimestamp(this ItemFieldDictionary itemFieldDictionary)
{
ItemField timestampItemField = null;
itemFieldDictionary.TryGetValue((uint)ItemFields.CustomFieldTimestamp, out timestampItemField);
DateTime timeStamp = DateTime.FromFileTimeUtc((long)((ulong)timestampItemField.Value));
return timeStamp;
}
public static string GetName(this ItemFieldDictionary itemFieldDictionary)
{
ItemField itemFieldName = null;
itemFieldDictionary.TryGetValue((uint)ItemFields.CustomFieldName, out itemFieldName);
string name = itemFieldName.Value as string;
return name;
}
public static long GetSize(this ItemFieldDictionary itemFieldDictionary)
{
ItemField itemFieldSize = null;
itemFieldDictionary.TryGetValue(ItemFields.CustomFieldSize, out itemFieldSize);
long size = Convert.ToInt64(itemFieldSize.Value);
return size;
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
using System.Threading;
using Microsoft.Synchronization;
using Microsoft.Synchronization.Files;
using Microsoft.Synchronization.MetadataStorage;
using Microsoft.Synchronization.SimpleProviders;
namespace RemoteStorage
{
public partial class RemoteStorageSyncProvider : CustomFullEnumerationSimpleSyncProvider, IRemoteStorageSyncProvider
{
/// <summary>
/// Initializes a new instance of the <see cref="RemoteStorageSyncProvider"/> class.
/// </summary>
/// <param name="clientLoggingService">The logger.</param>
/// <param name="remoteStorageProxyService">The proxy service.</param>
/// <param name="configurationService">The configuration service.</param>
/// <param name="instrumentationService">The instrumentation service.</param>
/// <param name="userManagerService">The user manager service.</param>
/// <param name="notificationService">The notification service.</param>
/// <param name="reportManagerService">The report manager service.</param>
public RemoteStorageSyncProvider(
IClientLoggingService clientLoggingService,
IRemoteStorageService remoteStorageProxyService,
IClientConfigurationService configurationService,
IInstrumentationService instrumentationService,
IUserManagerService userManagerService,
INotificationService notificationService,
IReportManagerService reportManagerService)
: base(clientLoggingService)
{
this.remoteStorageProxyService = remoteStorageProxyService;
this.configurationService = configurationService;
this.instrumentationService = instrumentationService;
this.userManagerService = userManagerService;
this.reportManagerService = reportManagerService;
this.CancellationToken = new CancellationTokenSource();
}
public event EventHandler<SyncProgressChangedEventArgs> ProgressChanged;
public event EventHandler<SimpleSyncAppliedChangeEventArgs> AppliedChange;
/// <summary>
/// Gets the replica id.
/// </summary>
/// <value>The replica id.</value>
public SyncId ReplicaId { get; private set; }
/// <summary>
/// Flag to mark if a cancellation is requested for Sync Framework
/// </summary>
public CancellationTokenSource CancellationToken { get; set; }
public void Initialize(SynchronisationSettings synchronisationSettings)
{
// Set the storage location
this.storageLocation = synchronisationSettings.StorageLocation;
this.instrumentationService.Initialize("MyApp", "Performance data for the application");
this.instrumentationService.Reset();
this.isInitialized = true;
}
/// <summary>
/// Enumerates all items in a replica.
/// </summary>
/// <param name="context">A <see cref="T:Microsoft.Synchronization.SimpleProviders.FullEnumerationContext"/> object that represents the context of
/// the synchronization session.</param>
public override void EnumerateItems(FullEnumerationContext context)
{
context.SetDeleteMode(SimpleSyncProviderDeleteMode.Normal);
List<ItemFieldDictionary> listItemFieldDictionary = EnumerateItemsCommon();
context.ReportItems(listItemFieldDictionary);
}
/// <summary>
/// Called by the Sync Framework runtime at the start of each session.
/// </summary>
public override void BeginSession()
{
if (!this.isInitialized)
{
throw new RemoteStorageException("Initialize() method must be called before using RemoteStorageSyncProvider");
}
sessionRef = Guid.NewGuid();
sessionRef = instrumentationService.SessionRef;
}
/// <summary>
/// Called by the Sync Framework runtime to return a <see cref="T:Microsoft.Synchronization.MetadataStorage.MetadataStore"/> object for a replica.
/// </summary>
/// <param name="syncId">A <see cref="T:Microsoft.Synchronization.SyncId"/> object that contains the ID of the replica for which the <see cref="T:Microsoft.Synchronization.MetadataStorage.MetadataStore"/> object is returned.</param>
/// <param name="culture">A <see cref="T:System.Globalization.CultureInfo"/> object that represents the culture that is used for string comparisons.</param>
/// <returns>
/// A <see cref="T:Microsoft.Synchronization.MetadataStorage.MetadataStore"/> object that represents a metadata store for the specified replica.
/// </returns>
public override MetadataStore GetMetadataStore(
out SyncId syncId,
out CultureInfo culture)
{
this.MetadataStore = SqlMetadataStoreFactory.CreateMetadataStore(
this.storageLocation,
this.configurationService,
this.ClientLoggingService,
MetadataStoreType.Remote);
var syncIdFactory = new SyncIdFactory(this.configurationService, this.ClientLoggingService);
syncId = syncIdFactory.GetSyncId(this.configurationService.RemoteMetadataDirectoryPath);
culture = CultureInfo.CurrentCulture;
return this.MetadataStore;
}
/// <summary>
/// Called by the Sync Framework runtime to insert an item into the destination store.
/// </summary>
/// <param name="itemData">Data for the item in provider-specific format.</param>
/// <param name="changeUnitsToCreate">A <see cref="T:Microsoft.Synchronization.SyncId"/> object that contains the change units to insert for an item.
/// The parameter should be null (not empty) if no change units are specified.</param>
/// <param name="recoverableErrorReportingContext">A <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object
/// that is used to report recoverable errors that occur during attempts to insert an item.</param>
/// <param name="keyAndUpdatedVersion">Returns the key and updated version properties of the item to be inserted. If the return value is not valid,
/// the Sync Framework runtime throws <see cref="T:System.ArgumentOutOfRangeException"/>, which ends the session.</param>
/// <param name="commitKnowledgeAfterThisItem">Returns whether the Sync Framework runtime should commit knowledge to the metadata store after
/// processing is complete for the specified item.</param>
public override void InsertItem(
object itemData,
IEnumerable<SyncId> changeUnitsToCreate,
RecoverableErrorReportingContext recoverableErrorReportingContext,
out ItemFieldDictionary keyAndUpdatedVersion,
out bool commitKnowledgeAfterThisItem)
{
var transactionWatch = Stopwatch.StartNew();
var transactionRef = Guid.NewGuid();
commitKnowledgeAfterThisItem = true;
var fileDataRetriever = (IFileDataRetriever)itemData;
int numberOfRetries = 3;
long retryWaitTimeInMilliseconds = 1000;
this.ClientLoggingService.LogInformation("INSERTING a local item into the remote store ({0})", fileDataRetriever.FileData.GetPathWithName());
keyAndUpdatedVersion = RemoteStorageSyncProvider.CreateKeyAndVersion(
fileDataRetriever.FileData.GetPathWithName(),
fileDataRetriever.FileData.LastWriteTime,
fileDataRetriever.FileData.Size);
bool success = InsertItemWithRetriesWrapper(
recoverableErrorReportingContext,
transactionWatch,
fileDataRetriever,
numberOfRetries,
retryWaitTimeInMilliseconds);
if (success)
{
InsertItemPostSuccess(keyAndUpdatedVersion, transactionWatch, fileDataRetriever, transactionRef);
}
else
{
keyAndUpdatedVersion = null;
this.ClientLoggingService.LogWarning("Local item has NOT been uploaded to the remote store");
}
////bool metadataStoresAreDamaged = AreMetadataStoresDamaged();
////if (metadataStoresAreDamaged)
////{
//// // Closing the metadata store here will cause exceptions later on, but not closing it causes more problems
//// this.CloseMetadataStore();
//// MetadataStoreHelper.FixupMetadataStores(metadataStoreInfo, this.logger);
////}
}
/// <summary>
/// Called by the Sync Framework runtime to delete an item from the destination store.
/// </summary>
/// <param name="keyAndExpectedVersion">The key and expected version properties of the item to be deleted. The provider must perform an optimistic
/// concurrency check to verify that the version of the item on the destination corresponds to the values found in keyAndExpectedVersion. If this
/// check fails, provider should report a recoverable error by using
/// a <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object.</param>
/// <param name="recoverableErrorReportingContext">A <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object
/// that is used to report recoverable errors that occur during attempts to delete an item.</param>
/// <param name="commitKnowledgeAfterThisItem">Returns whether the Sync Framework runtime should commit knowledge to the metadata store after
/// processing is complete for the specified item.</param>
public override void DeleteItem(
ItemFieldDictionary keyAndExpectedVersion,
RecoverableErrorReportingContext recoverableErrorReportingContext,
out bool commitKnowledgeAfterThisItem)
{
var transactionWatch = Stopwatch.StartNew();
var transactionRef = Guid.NewGuid();
commitKnowledgeAfterThisItem = true;
this.ClientLoggingService.LogInformation("DELETING an item");
string filename = keyAndExpectedVersion.GetName();
DateTime expectedLastModified = keyAndExpectedVersion.GetTimestamp();
this.ClientLoggingService.LogInformation("Filename: {0} ExpectedLastModified: {1}", filename, expectedLastModified);
try
{
remoteStorageProxyService.DeleteFile(this.storageLocation, filename, expectedLastModified);
// Decrease user usage when item is deleted
userManagerService.UpdateUsage(configurationService.SiloId, (-1) * keyAndExpectedVersion.GetSize());
// Log the transaction remotely
reportManagerService.InsertTransaction(new TransactionLog
{
TransactionRef = transactionRef,
TransactionType = TransactionType.Delete,
SessionRef = sessionRef,
RecordedDate = DateTime.Now,
FileName = filename,
FileSize = keyAndExpectedVersion.GetSize(),
IsDirectory = keyAndExpectedVersion.GetSize() == 0,
Duration = transactionWatch.Elapsed.Seconds,
Message = string.Format("Delete Transaction {0}", transactionRef)
});
OnAppliedChange(ChangeType.Delete, filename);
this.ClientLoggingService.LogInformation("Finished deleting item");
this.ClientLoggingService.LogInformation(string.Empty);
}
catch (UnauthorisedRemoteStoreOperationException ex)
{
LogFileOperationError("UpdateItem", 0, 0, ex);
this.ClientLoggingService.LogWarning("Aborting unauthorised action without additional retries. Item not processed but sync shall continue");
RecordRecoverableErrorForChange(recoverableErrorReportingContext, ex, filename, -1, transactionWatch.Elapsed);
}
catch (SyncException ex)
{
LogFileOperationError("UpdateItem", 0, 0, ex);
// give up processing this item but also terminate the whole sync immediately
// when unrecoverable error occurs (abort requested, authentication failed)
throw;
}
catch (Exception ex)
{
RecordRecoverableErrorForChange(recoverableErrorReportingContext, ex, filename, -1, transactionWatch.Elapsed);
}
}
/// <summary>
/// Handles the OnProgressChanged event of the UploadStreamWithProgress control.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="e">The <see cref="RemoteStorageSyncProvider.StreamWithProgress.SyncProgressChangedEventArgs"/> instance containing
/// the event data.</param>
public void UploadStreamWithProgress_ProgressChanged(object sender, SyncProgressChangedEventArgs e)
{
ProgressChanged(this, new SyncProgressChangedEventArgs(e.BytesRead, e.Length));
}
/// <summary>
/// Called by the Sync Framework runtime to load the change data for the item with the specified key.
/// </summary>
/// <param name="keyAndExpectedVersion">The key and expected version properties of the item for which data is loaded. The provider must perform an optimistic concurrency check to verify that the version of the item on the destination corresponds to the values found in keyAndExpectedVersion. If this check fails, provider should report a recoverable error by using a <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object.</param>
/// <param name="changeUnitsToLoad">A <see cref="T:Microsoft.Synchronization.SyncId"/> object that contains the change units to load for an item. The parameter should be null (not empty) if no change units are specified.</param>
/// <param name="recoverableErrorReportingContext">A <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object that is used to report recoverable errors that occur during attempts to update an item.</param>
/// <returns>
/// An object that represents the change data for the item with the specified key.
/// </returns>
public override object LoadChangeData(
ItemFieldDictionary keyAndExpectedVersion,
IEnumerable<SyncId> changeUnitsToLoad,
RecoverableErrorReportingContext recoverableErrorReportingContext)
{
var transactionWatch = Stopwatch.StartNew();
var transactionRef = Guid.NewGuid();
object changeData = null;
try
{
changeData = LoadChangeDataCommon(keyAndExpectedVersion);
var fileDataRetriever = (IFileDataRetriever)changeData;
// Log the transaction remotely
this.reportManagerService.InsertTransaction(new TransactionLog
{
TransactionRef = transactionRef,
TransactionType = TransactionType.Retrieve,
SessionRef = sessionRef,
RecordedDate = DateTime.Now,
FileName = fileDataRetriever.FileData.Name,
FileSize = fileDataRetriever.FileData.Size,
IsDirectory = fileDataRetriever.FileData.IsDirectory,
Duration = transactionWatch.Elapsed.Seconds,
Message = string.Format("Retrieve Transaction {0}", transactionRef)
});
}
catch (Exception ex)
{
RecordRecoverableErrorForChange(recoverableErrorReportingContext, ex, keyAndExpectedVersion.GetName(), -1, transactionWatch.Elapsed);
}
return changeData;
}
/// <summary>
/// Called by the Sync Framework runtime to update an item in the destination store.
/// </summary>
/// <param name="itemData">Data for the item in provider-specific format.</param>
/// <param name="changeUnitsToUpdate">A <see cref="T:Microsoft.Synchronization.SyncId"/> object that contains the change units to update for an item.
/// The parameter should be null (not empty) if no change units are specified.</param>
/// <param name="keyAndExpectedVersion">The key and expected version properties of the item to be updated. The provider must perform an optimistic
/// concurrency check to verify that the version of the item on the destination corresponds to the values found in keyAndExpectedVersion. If this
/// check fails, provider should report a recoverable error by using
/// a <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object.</param>
/// <param name="recoverableErrorReportingContext">A <see cref="T:Microsoft.Synchronization.SimpleProviders.RecoverableErrorReportingContext"/> object
/// that is used to report recoverable errors that occur during attempts to update an item.</param>
/// <param name="keyAndUpdatedVersion">Returns the key and updated version properties of the updated items. If the return value is not valid, the Sync
/// Framework runtime throws <see cref="T:System.ArgumentOutOfRangeException"/>, which ends the session.</param>
/// <param name="commitKnowledgeAfterThisItem">Returns whether the Sync Framework runtime should commit knowledge to the metadata store after
/// processing is complete for the specified item.</param>
public override void UpdateItem(
object itemData,
IEnumerable<SyncId> changeUnitsToUpdate,
ItemFieldDictionary keyAndExpectedVersion,
RecoverableErrorReportingContext recoverableErrorReportingContext,
out ItemFieldDictionary keyAndUpdatedVersion,
out bool commitKnowledgeAfterThisItem)
{
var transactionWatch = Stopwatch.StartNew();
var transactionRef = Guid.NewGuid();
this.ClientLoggingService.LogInformation("Updating an item");
commitKnowledgeAfterThisItem = true;
var fileDataRetriever = (IFileDataRetriever)itemData;
string currentOrNewFilename = fileDataRetriever.FileData.Name;
string oldFilename = keyAndExpectedVersion.GetName();
string pathWithName = Path.Combine(fileDataRetriever.RelativeDirectoryPath, currentOrNewFilename);
keyAndUpdatedVersion = RemoteStorageSyncProvider.CreateKeyAndVersion(
pathWithName,
fileDataRetriever.FileData.LastWriteTime,
fileDataRetriever.FileData.Size);
DateTime expectedLastModified = RemoteStorageSyncProviderHelper.GetExpectedLastModifiedOnServer();
bool success = UpdateItemWrapper(
recoverableErrorReportingContext,
transactionWatch,
fileDataRetriever,
expectedLastModified,
oldFilename);
if (success)
{
UpdateItemPostSuccess(keyAndExpectedVersion, keyAndUpdatedVersion, transactionWatch, transactionRef, fileDataRetriever, currentOrNewFilename);
}
else
{
this.ClientLoggingService.LogWarning("Local item has NOT been updated in the remote store");
}
}
private void InsertItemPostSuccess(
ItemFieldDictionary keyAndUpdatedVersion,
Stopwatch transactionWatch,
IFileDataRetriever fileDataRetriever,
Guid transactionRef)
{
if (keyAndUpdatedVersion == null)
{
throw new InvalidOperationException("When insert succeeds keyAndUpdatedVersion should be set");
}
// Increase user usage when item is inserted
this.userManagerService.UpdateUsage(configurationService.SiloId, keyAndUpdatedVersion.GetSize());
// Log the transaction remotely
reportManagerService.InsertTransaction(new TransactionLog
{
TransactionRef = transactionRef,
TransactionType = TransactionType.Insert,
SessionRef = sessionRef,
RecordedDate = DateTime.Now,
FileName = keyAndUpdatedVersion.GetName(),
FileSize = keyAndUpdatedVersion.GetSize(),
IsDirectory = fileDataRetriever.FileData.IsDirectory,
Duration = transactionWatch.Elapsed.Seconds,
Message = string.Format("Insert Transaction {0}", transactionRef)
});
OnAppliedChange(ChangeType.Create, fileDataRetriever.FileData.GetPathWithName());
this.ClientLoggingService.RecordItemSynchronisationSuccess(
fileDataRetriever.FileData.GetPathWithName(),
fileDataRetriever.FileData.Size,
transactionWatch.Elapsed);
this.ClientLoggingService.LogInformation("InsertItem() took {0}", transactionWatch.Elapsed.ToString(@"hh\:mm\:ss"));
this.ClientLoggingService.LogInformation("Local item has been uploaded to the remote store");
}
private void UpdateItemPostSuccess(
ItemFieldDictionary keyAndExpectedVersion,
ItemFieldDictionary keyAndUpdatedVersion,
Stopwatch transactionWatch,
Guid transactionRef,
IFileDataRetriever fileDataRetriever,
string currentOrNewFilename)
{
if (keyAndUpdatedVersion == null)
{
throw new InvalidOperationException("When update succeeds keyAndUpdatedVersion should be set");
}
// Increase/Decrease user usage
this.userManagerService.UpdateUsage(configurationService.SiloId, keyAndUpdatedVersion.GetSize() - keyAndExpectedVersion.GetSize());
// Log the transaction remotely
reportManagerService.InsertTransaction(new TransactionLog
{
TransactionRef = transactionRef,
TransactionType = TransactionType.Update,
SessionRef = sessionRef,
RecordedDate = DateTime.Now,
FileName = currentOrNewFilename,
FileSize = fileDataRetriever.FileData.Size,
IsDirectory = fileDataRetriever.FileData.IsDirectory,
Duration = transactionWatch.Elapsed.Seconds,
Message = string.Format("Update Transaction {0}", transactionRef)
});
OnAppliedChange(ChangeType.Update, fileDataRetriever.FileData.GetPathWithName());
this.ClientLoggingService.LogInformation("Finished updating item");
this.ClientLoggingService.LogInformation(
"UpdateItem() took {0}",
transactionWatch.Elapsed.ToString(@"hh\:mm\:ss"));
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Synchronization;
using Microsoft.Synchronization.Files;
using Microsoft.Synchronization.SimpleProviders;
namespace RemoteStorage
{
public partial class RemoteStorageSyncProvider
{
private readonly IRemoteStorageService remoteStorageProxyService;
private readonly IClientConfigurationService configurationService;
private readonly IInstrumentationService instrumentationService;
private readonly IUserManagerService userManagerService;
private readonly IReportManagerService reportManagerService;
private StorageLocation storageLocation;
private bool isInitialized = false;
private Guid sessionRef;
/// <summary>
/// Enumerates all items.
/// </summary>
protected List<ItemFieldDictionary> EnumerateItemsCommon()
{
this.ClientLoggingService.LogInformation("Enumerating files on the server");
List<ItemFieldDictionaryDataContract> listItemFieldDictionaryDataContract = null;
listItemFieldDictionaryDataContract = remoteStorageProxyService.ListFiles(this.storageLocation);
List<ItemFieldDictionary> listItemFieldDictionary = listItemFieldDictionaryDataContract.ConvertAll(
new Converter<ItemFieldDictionaryDataContract, ItemFieldDictionary>(
ConversionHelper.ConvertFromItemFieldDictionaryDataContractToItemFieldDictionary));
RemoteStorageSyncProviderHelper.Log(listItemFieldDictionary, this.ClientLoggingService);
this.ClientLoggingService.LogInformation("Finished enumerating files on the server");
this.ClientLoggingService.LogInformation(String.Empty);
return listItemFieldDictionary;
}
protected object LoadChangeDataCommon(ItemFieldDictionary keyAndExpectedVersion)
{
string relativePathAndFilename;
DateTime expectedLastUpdate;
GetFileRequestInformation(keyAndExpectedVersion, out relativePathAndFilename, out expectedLastUpdate);
FileData fileData = null;
FileDataRetriever fileDataRetriever = null;
string tempFilePath = null;
try
{
GetFileFromBlocks(
new FileBlockContext()
{
RelativePathAndFilename = relativePathAndFilename,
ExpectedLastUpdate = expectedLastUpdate,
StorageLocation = this.storageLocation,
},
out fileData,
out tempFilePath);
}
catch (Exception ex)
{
HandleChangeDataCommonFailure(ex);
return null;
}
Stream returnStream = File.OpenRead(tempFilePath);
fileDataRetriever = new FileDataRetriever
{
FileData = fileData,
FileStream = returnStream,
RelativeDirectoryPath = Path.GetDirectoryName(fileData.RelativePath)
};
return fileDataRetriever;
}
private static void GetFileRequestInformation(
ItemFieldDictionary keyAndExpectedVersion,
out string relativePathAndFilename,
out DateTime expectedLastUpdate)
{
IDictionary<uint, ItemField> expectedFields = (IDictionary<uint, ItemField>)keyAndExpectedVersion;
relativePathAndFilename = (string)expectedFields[ItemFields.CustomFieldName].Value;
expectedLastUpdate = DateTime.FromFileTimeUtc(Convert.ToInt64(expectedFields[ItemFields.CustomFieldTimestamp].Value));
}
private void HandleChangeDataCommonFailure(Exception e)
{
string reason = "Exception during downloading File and storing in temp direcoty: " + e.Message;
this.ClientLoggingService.LogError("*********************** BEGIN EXCEPTION ***********************");
this.ClientLoggingService.LogError(reason);
if (e.InnerException != null)
{
this.ClientLoggingService.LogError(e.InnerException.ToString());
}
this.ClientLoggingService.LogError("*********************** END EXCEPTION ***********************");
}
private void PutSingleBlock(
ProgressStream stream,
StorageLocation storageLocation,
int finalBlockSizeInBytes,
int blockId,
string relativePathAndFilename)
{
byte[] blockData = new byte[finalBlockSizeInBytes];
stream.Read(blockData, 0, finalBlockSizeInBytes);
string contentMD5 = null;
this.remoteStorageProxyService.PutBlock(
storageLocation,
relativePathAndFilename,
blockId.ToString(),
blockData,
contentMD5);
}
private void PutAllBlocksExceptFinal(
ProgressStream stream,
int blockSizeInBytes,
int numberOfBlocks,
string relativePathAndFilename,
List<string> blockList)
{
ParallelLoopResult result = new ParallelLoopResult();
try
{
result = Parallel.For(
0,
numberOfBlocks,
new ParallelOptions() { MaxDegreeOfParallelism = 3 },
(int blockId) =>
{
// if Cancellation has been requested stop at current block
if (this.CancellationToken.IsCancellationRequested)
{
this.ClientLoggingService.LogInformation(string.Format(
"Uploading a file ('{2}') has been cancelled at blockID={0} of {1} blocks",
blockId,
numberOfBlocks,
relativePathAndFilename));
throw new SyncAbortedException("Upload aborted at blockId=" + blockId);
}
// read block of data from source stream
byte[] blockData = new byte[blockSizeInBytes];
stream.Read(blockData, 0, blockSizeInBytes);
// send block to remote storage
string contentMD5 = null;
this.remoteStorageProxyService.PutBlock(
this.storageLocation,
relativePathAndFilename,
blockId.ToString(),
blockData,
contentMD5);
blockList.Add(blockId.ToString());
this.ClientLoggingService.LogInformation("PutBlock #{0}", blockId);
});
}
catch (AggregateException ex)
{
this.ClientLoggingService.LogInformation("PutAllBlocks Parallel result=" + ex.Message);
throw ex.InnerException;
}
this.ClientLoggingService.LogInformation("PutAllBlocks Parallel result=" + result.IsCompleted);
}
private void InsertFileItem(IFileDataRetriever fileDataRetriever, FileData fileData)
{
string fileSize = GeneralHelper.GetSizeAsString(fileData.Size);
this.ClientLoggingService.LogInformation("Filename: {0} ({1})", fileData.GetPathWithName(), fileSize);
using (FileStream sourceFileStream = new FileStream(fileDataRetriever.AbsoluteSourceFilePath, FileMode.Open))
{
using (ProgressStream uploadStreamWithProgress = new ProgressStream(sourceFileStream))
{
uploadStreamWithProgress.SyncProgressChanged += new EventHandler<SyncProgressChangedEventArgs>(
UploadStreamWithProgress_ProgressChanged);
var customMetadata = GetCustomMetadata();
this.ClientLoggingService.LogInformation("Uploading new file " + fileData.GetPathWithName());
var blockList = UploadFileBlocks(
fileData,
uploadStreamWithProgress,
this.storageLocation);
// Commit the blocks
var remoteFileProperties = this.remoteStorageProxyService.CommitBlocksAsInsert(
storageLocation,
fileDataRetriever.RelativeDirectoryPath,
blockList,
customMetadata,
fileData);
this.ClientLoggingService.LogInformation("PutBlockList for {0}", fileData.GetPathWithName());
this.ClientLoggingService.LogInformation("Uploaded new file");
}
string pathWithName = fileData.GetPathWithName();
}
this.instrumentationService.IncrementCounter(CounterName.BytesUploadedThisSynchronisationSession, fileData.Size);
}
/// <summary>
/// Updates the item.
/// </summary>
/// <param name="fileDataRetriever">The file data retriever.</param>
/// <param name="expectedLastModified">The expected last modified.</param>
/// <param name="oldFilename">The old filename.</param>
/// <param name="fileDataDataContract">The file data data contract.</param>
private void UpdateItem(IFileDataRetriever fileDataRetriever, DateTime expectedLastModified, string oldFilename, FileData fileData)
{
string fileDataSize = GeneralHelper.GetSizeAsString(fileData.Size);
this.ClientLoggingService.LogInformation("Filename: {0} ({1})", fileData.GetPathWithName(), fileDataSize);
using (FileStream sourceFileStream = File.OpenRead(fileDataRetriever.AbsoluteSourceFilePath))
{
using (ProgressStream uploadStreamWithProgress = new ProgressStream(sourceFileStream))
{
uploadStreamWithProgress.SyncProgressChanged += new EventHandler<SyncProgressChangedEventArgs>(
UploadStreamWithProgress_ProgressChanged);
var customMetadata = GetCustomMetadata();
this.ClientLoggingService.LogInformation("Uploading updated file " + fileData.GetPathWithName());
var blockList = UploadFileBlocks(
fileData,
uploadStreamWithProgress,
this.storageLocation);
// Commit the blocks
this.remoteStorageProxyService.CommitBlocksAsUpdate(
this.storageLocation,
fileDataRetriever.RelativeDirectoryPath,
blockList,
customMetadata,
fileData,
oldFilename,
expectedLastModified);
this.ClientLoggingService.LogInformation("PutBlockList for {0}", fileData.GetPathWithName());
this.ClientLoggingService.LogInformation("Uploaded updated file");
}
}
}
/// <summary>
/// Inserts item and report result of the operation
/// </summary>
/// <returns>False if insert failed, otherwise True</returns>
private bool InsertItemWithRetriesWrapper(
RecoverableErrorReportingContext recoverableErrorReportingContext,
Stopwatch stopwatch,
IFileDataRetriever fileDataRetriever,
int numberOfRetries,
long retryWaitTimeInMilliseconds)
{
do
{
try
{
if (!fileDataRetriever.FileData.IsDirectory)
{
InsertFileItem(fileDataRetriever, fileDataRetriever.FileData);
}
return true;
}
catch (UnauthorisedRemoteStoreOperationException ex)
{
LogFileOperationError("InsertItem", numberOfRetries, retryWaitTimeInMilliseconds, ex);
LogUnauthorisedOperation(recoverableErrorReportingContext, stopwatch, fileDataRetriever, numberOfRetries, retryWaitTimeInMilliseconds, ex);
return false;
}
catch (SyncException ex)
{
LogFileOperationError("InsertItem", numberOfRetries, retryWaitTimeInMilliseconds, ex);
// give up inserting this item but also terminate the whole sync immediately
// when unrecoverable error occurs (abort requested, authentication failed)
throw;
}
catch (Exception e)
{
FileData fileData = fileDataRetriever.FileData;
if (!ShouldRetry(recoverableErrorReportingContext, stopwatch, fileDataRetriever.FileData, numberOfRetries, retryWaitTimeInMilliseconds, e))
{
return false;
}
}
}
while (numberOfRetries-- > 0);
return false;
}
private bool UpdateItemWrapper(
RecoverableErrorReportingContext recoverableErrorReportingContext,
Stopwatch stopwatch,
IFileDataRetriever fileDataRetriever,
DateTime expectedLastModified,
string oldFilename)
{
int numberOfRetries = 0;
long retryWaitTimeInMilliseconds = 0;
try
{
UpdateItem(fileDataRetriever, expectedLastModified, oldFilename, fileDataRetriever.FileData);
return true;
}
catch (UnauthorisedRemoteStoreOperationException ex)
{
LogFileOperationError("UpdateItem", numberOfRetries, retryWaitTimeInMilliseconds, ex);
LogUnauthorisedOperation(recoverableErrorReportingContext, stopwatch, fileDataRetriever, numberOfRetries, retryWaitTimeInMilliseconds, ex);
return false;
}
catch (SyncException ex)
{
LogFileOperationError("UpdateItem", numberOfRetries, retryWaitTimeInMilliseconds, ex);
// give up updating this item but also terminate the whole sync immediately
// when unrecoverable error occurs (abort requested, authentication failed)
throw;
}
catch (Exception e)
{
FileData fileData = fileDataRetriever.FileData;
ShouldRetry(recoverableErrorReportingContext, stopwatch, fileDataRetriever.FileData, numberOfRetries, retryWaitTimeInMilliseconds, e);
return false;
}
}
private void LogUnauthorisedOperation(
RecoverableErrorReportingContext recoverableErrorReportingContext,
Stopwatch stopwatch,
IFileDataRetriever fileDataRetriever,
int numberOfRetries,
long retryWaitTimeInMilliseconds,
UnauthorisedRemoteStoreOperationException ex)
{
this.ClientLoggingService.LogWarning("Aborting unauthorised action without additional retries. Item not processed but sync shall continue");
// no retries for unauthorised errors so record recoverable error and signal failure
RecordRecoverableErrorForChange(
recoverableErrorReportingContext,
ex,
fileDataRetriever.FileData.GetPathWithName(),
fileDataRetriever.FileData.Size,
stopwatch.Elapsed);
}
/// <summary>
/// Determines whether another retry is possible and if yes puts current thread to sleep before another attempt
/// </summary>
/// <returns>True if retry allowed, False if operation should be aborted</returns>
private bool ShouldRetry(
RecoverableErrorReportingContext recoverableErrorReportingContext,
Stopwatch stopwatch,
FileData fileData,
int numberOfRetries,
long retryWaitTimeInMilliseconds,
Exception ex)
{
this.ClientLoggingService.LogWarning("Exception in RemoteStorageSyncProvider.InsertItem() - {0}:{1}", ex.GetType().ToString(), ex.Message);
this.ClientLoggingService.LogWarning("InsertItem() took {0} before it failed", stopwatch.Elapsed.ToString(@"hh\:mm\:ss"));
this.ClientLoggingService.LogWarning("numberOfRetries={0} retryWaitTimeInMilliseconds={1}", numberOfRetries, retryWaitTimeInMilliseconds);
// Give up if all retries used and still no success
if (numberOfRetries <= 0)
{
this.ClientLoggingService.LogWarning("Retry count has been exceeded so giving up.");
RecordRecoverableErrorForChange(
recoverableErrorReportingContext,
ex,
fileData.GetPathWithName(),
fileData.Size,
stopwatch.Elapsed);
return false;
}
else
{
// still have some retries left so wait for a while and try again
Thread.Sleep(TimeSpan.FromMilliseconds(retryWaitTimeInMilliseconds));
return true;
}
}
private void LogFileOperationError(string context, int numberOfRetries, long retryWaitTimeInMilliseconds, Exception ex)
{
this.ClientLoggingService.LogWarning("Exception in RemoteStorageSyncProvider {1} - {0}", ex.Message, context);
this.ClientLoggingService.LogWarning("numberOfRetries so far ={0} retryWaitTimeInMilliseconds={1}", numberOfRetries, retryWaitTimeInMilliseconds);
}
private CustomMetadata GetCustomMetadata()
{
var customMetadata = new CustomMetadata()
{
ClientVersionId = configurationService.ClientVersionId,
EncryptionKeyId = configurationService.EncryptionKeyId,
};
return customMetadata;
}
private void RecordRecoverableErrorForChange(
RecoverableErrorReportingContext recoverableErrorReportingContext,
Exception ex,
string fileName,
long fileSize,
TimeSpan timeToFailure)
{
this.ClientLoggingService.LogError("*********************** BEGIN EXCEPTION ***********************");
this.ClientLoggingService.LogError(ex.ToString());
if (ex.InnerException != null)
{
this.ClientLoggingService.LogError(ex.InnerException.ToString());
}
this.ClientLoggingService.LogError("Calling RecordRecoverableErrorForChange()");
var recoverableErrorData = new RecoverableErrorData(ex, fileName, ex.Message);
if (recoverableErrorReportingContext != null)
{
recoverableErrorReportingContext.RecordRecoverableErrorForChange(recoverableErrorData);
}
this.instrumentationService.IncrementCounter(CounterName.NumberOfRecoverableErrorsReported, 1);
this.ClientLoggingService.RecordItemSynchronisationFailure(fileName, fileSize, ex.Message, timeToFailure);
this.ClientLoggingService.LogError("*********************** END EXCEPTION ***********************");
}
/// <summary>
/// used to create a temporary folder
/// </summary>
private string CreateTemporaryFolder()
{
string tempDirectory = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
Directory.CreateDirectory(tempDirectory);
return tempDirectory;
}
/// <summary>
/// This method is used to write chunks/bytes to a temporary file in temp directory.
/// </summary>
private void WriteBytesToFile(Stream stream, byte[] fileBlock)
{
stream.Write(fileBlock, 0, fileBlock.Length);
}
private void GetFileFromBlocks(
FileBlockContext fileBlockContext,
out FileData fileData,
out string tempFilePath)
{
FileBlockInformation fileBlockInformation;
this.remoteStorageProxyService.GetFileBlockInformation(
fileBlockContext,
this.configurationService.DownloadBlockSizeInBytes,
out fileBlockInformation,
out fileData);
string tempDirectory = CreateTemporaryFolder();
tempFilePath = tempDirectory + "\\" + fileData.Name;
string fileName = fileData.Name;
using (Stream stream = new FileStream(tempFilePath, FileMode.Create))
{
// call GetFileBlock only if there is at least one block that can be downloaded.
if (fileBlockInformation.NumberOfBlocks > 0)
{
ParallelLoopResult result = new ParallelLoopResult();
try
{
result = Parallel.For(
0,
fileBlockInformation.NumberOfBlocks,
new ParallelOptions() { MaxDegreeOfParallelism = 3 },
(int blockIndex) =>
{
ReadBlockToStream(
fileBlockContext,
fileBlockInformation,
fileName,
stream,
blockIndex);
});
}
catch (AggregateException ex)
{
this.ClientLoggingService.LogError("GetFileFromBlocks Parallel result=" + ex.Message);
throw ex.InnerException;
}
this.ClientLoggingService.LogInformation("GetFileFromBlocks Parallel result=" + result.IsCompleted);
}
}
}
private void ReadBlockToStream(
FileBlockContext fileBlockContext,
FileBlockInformation fileBlockInformation,
string fileName,
Stream outputStream,
int blockIndex)
{
// if Cancellation has been requested stop at current block
if (this.CancellationToken.IsCancellationRequested)
{
this.ClientLoggingService.LogInformation(string.Format(
"Downloading a file ('{2}') has been cancelled at blockIndex={0} of {1} blocks",
blockIndex,
fileBlockInformation.NumberOfBlocks,
fileName));
throw new SyncAbortedException("Download aborted at blockIndex=" + blockIndex);
}
int currentblockSize = this.configurationService.DownloadBlockSizeInBytes;
if (blockIndex == fileBlockInformation.NumberOfBlocks - 1 && fileBlockInformation.BlockSizeOfFinalBlockInBytes != 0)
{
currentblockSize = fileBlockInformation.BlockSizeOfFinalBlockInBytes;
}
byte[] blockData;
this.remoteStorageProxyService.GetFileBlock(
fileBlockContext,
blockIndex,
currentblockSize,
out blockData);
this.ClientLoggingService.LogInformation("Downloading block #{0}", blockIndex);
WriteBytesToFile(outputStream, blockData);
}
private List<string> UploadFileBlocks(
FileData fileData,
ProgressStream stream,
StorageLocation storageLocation)
{
this.ClientLoggingService.LogInformation("Using chunking for uploading {0}", fileData.GetPathWithName());
int blockSizeInBytes = this.configurationService.UploadBlockSizeInBytes;
long fileSizeInBytes = stream.Length;
int finalBlockSizeInBytes;
int numberOfBlocks;
GeneralHelper.CalculateNumberOfBlocks(blockSizeInBytes, fileSizeInBytes, out finalBlockSizeInBytes, out numberOfBlocks);
string relativePathAndFilename = fileData.GetPathWithName();
List<string> blockList = new List<string>(numberOfBlocks);
int numberOfFullSizeBlocks = numberOfBlocks;
if (finalBlockSizeInBytes > 0)
{
numberOfFullSizeBlocks--;
}
PutAllBlocksExceptFinal(stream, blockSizeInBytes, numberOfFullSizeBlocks, relativePathAndFilename, blockList);
// Finally we put the remainder
if (finalBlockSizeInBytes > 0)
{
int finalBlockId = numberOfBlocks - 1;
PutSingleBlock(stream, storageLocation, finalBlockSizeInBytes, finalBlockId, relativePathAndFilename);
blockList.Add(finalBlockId.ToString());
this.ClientLoggingService.LogInformation("PutBlock #{0}", finalBlockId);
}
return blockList;
}
private void OnAppliedChange(ChangeType changeType, string fileName)
{
if (this.AppliedChange != null)
{
var eventArgs = new SimpleSyncAppliedChangeEventArgs
{
ChangeType = changeType,
NewFilePath = fileName,
OldFilePath = fileName
};
this.AppliedChange(this, eventArgs);
}
}
private bool AreMetadataStoresDamaged()
{
var metadataStoreInfoFactory = new MetadataStoreInfoFactory(this.storageLocation, this.configurationService);
var localMetadataStoreInfo = metadataStoreInfoFactory.CreateMetadataStoreInfo(MetadataStoreType.Local);
var remoteMetadataStoreInfo = metadataStoreInfoFactory.CreateMetadataStoreInfo(MetadataStoreType.Remote);
bool oneOrMoreMetadataStoresDoNotExist = !MetadataStoreHelper.DoesMetadataStoreExist(remoteMetadataStoreInfo)
|| !MetadataStoreHelper.DoesMetadataStoreExist(localMetadataStoreInfo);
return oneOrMoreMetadataStoresDoNotExist;
}
}
}
using System;
using System.Collections.Generic;
using Microsoft.Synchronization.SimpleProviders;
namespace RemoteStorage
{
public static class RemoteStorageSyncProviderHelper
{
internal const int MaxFileNameLength = 255;
internal static void Log(List<ItemFieldDictionary> listItemFieldDictionary, IClientLoggingService logger)
{
foreach (var dictionary in listItemFieldDictionary)
{
string logEntry;
try
{
string filename = dictionary[ItemFields.CustomFieldName].Value.ToString();
ulong timestamp = Convert.ToUInt64(dictionary[ItemFields.CustomFieldTimestamp].Value);
ulong filesize = Convert.ToUInt64(dictionary[ItemFields.CustomFieldSize].Value);
logEntry = string.Format(
"\tFilename: {0} Timestamp: {1} File size in bytes: {2}",
filename,
DateTime.FromFileTimeUtc((long)timestamp).ToString(),
filesize);
}
catch
{
logEntry = "Couldn't log";
}
logger.LogInformation(logEntry);
}
}
/// <summary>
/// We would expect that there have been no modifications to the file on the server since the time the request started.
/// </summary>
/// <returns></returns>
internal static DateTime GetExpectedLastModifiedOnServer()
{
DateTime expectedLastModified = DateTime.UtcNow;
return expectedLastModified;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment