Created
March 4, 2013 06:44
-
-
Save abdullin/5080455 to your computer and use it in GitHub Desktop.
Samples of commands in test client used for event store manipulation (fetching it to local storage and searching). They mostly leverage Lokad.CQRS framework and test client interactive shell (see, for example, beingtheworst.com source: https://github.com/beingtheworst/btw-gtd)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class FetchEventStoreProcessor : ICommandProcessor | |
{ | |
public string Key { get { return "FETCH"; } } | |
public string Usage { get { return "FETCH <remoteConfig> [<folderNameInLokadData>]"; } } | |
public bool Execute(CommandProcessorContext context, CancellationToken token, string[] args) | |
{ | |
if (args == null || args.Length == 0) | |
{ | |
context.Log.Error("No remoteConfig provided."); | |
return false; | |
} | |
var config = args[0]; | |
var localSubPath = args.Length > 1 ? args[1] : "s2-store"; | |
var localPath = Path.Combine(@"C:\LokadData", localSubPath, Setup.TapesContainer); | |
if (!Directory.Exists(localPath)) | |
{ | |
context.Log.Error(string.Format("ERROR: Directory \"{0}\" not found.", localPath)); | |
return false; | |
} | |
IBlobReader blobReader; | |
try | |
{ | |
const StringComparison icic = StringComparison.InvariantCultureIgnoreCase; | |
if (config.StartsWith("DefaultEndpointsProtocol=", icic) | |
|| config.StartsWith("UseDevelopmentStorage=true", icic)) | |
{ | |
blobReader = new AzureBlobReader(config); | |
} | |
else | |
{ | |
var path = Path.Combine(Path.GetFullPath(config), Setup.TapesContainer); | |
blobReader = new FileBlobReader(path); | |
} | |
} | |
catch (Exception e) | |
{ | |
context.Log.Error("ERROR: {0}", e.Message); | |
return false; | |
} | |
context.Log.Info("Sync \"{0}\" to local folder \"{1}\".", blobReader.GetConfig, localPath); | |
if (localPath.Equals(blobReader.GetConfig, StringComparison.InvariantCultureIgnoreCase)) | |
{ | |
context.Log.Error("ERROR: {0}", "Can not sync to itself."); | |
return false; | |
} | |
try | |
{ | |
Sync(blobReader, localPath, context.Log); | |
} | |
catch (Exception e) | |
{ | |
context.Log.Error("ERROR: {0}", e.Message); | |
var ae = e as AggregateException; | |
if (ae != null) | |
foreach (var ie in ae.InnerExceptions) | |
context.Log.Error("ERROR: {0}", ie.Message); | |
return false; | |
} | |
return true; | |
} | |
void Sync(IBlobReader blobReader, string localPath, ILogger log) | |
{ | |
log.Info("Getting blob list"); | |
// Get all names of local files. Find latest | |
var localNames = Directory.GetFiles(localPath).Select(Path.GetFileName).OrderBy(x => x).ToArray(); | |
var remoteNames = blobReader.ListBlobs() | |
.AsParallel() | |
.OrderBy(x => x).ToArray(); | |
// Check that local and remote are same streams | |
var chunk = 0; | |
for (; chunk < remoteNames.Length; ++chunk) | |
{ | |
if (chunk >= localNames.Length) | |
break; | |
if (remoteNames[chunk] != localNames[chunk]) | |
throw new InvalidOperationException(string.Format("Set of blobs differ: local=\"{0}\", remote=\"{1}\"", localNames[chunk], remoteNames[chunk])); | |
} | |
if (chunk < localNames.Length) | |
throw new InvalidOperationException(string.Format("Set of blobs differ: local=\"{0}\", remote not found", localNames[chunk])); | |
// Synchronize last local chunk | |
if (chunk > 0) | |
{ | |
log.Info("Synchronizing last local chunk"); | |
SyncLastChunk(blobReader, localPath, localNames[chunk - 1]); | |
} | |
if (chunk >= remoteNames.Length) | |
return; | |
// Copy missing chunks | |
log.Info("Copying missing chunks"); | |
Parallel.For(chunk, remoteNames.Length, chIndex => | |
{ | |
var chunkName = remoteNames[chIndex]; | |
var bytes = ReadWithRetry(blobReader, chunkName); | |
File.WriteAllBytes(Path.Combine(localPath, chunkName), bytes); | |
}); | |
} | |
void SyncLastChunk(IBlobReader blobReader, string localPath, string blobName) | |
{ | |
StorageFrameDecoded[] remoteFrames; | |
var remoteBytes = blobReader.ReadAllBytes(blobName); | |
using (var stream = new MemoryStream(remoteBytes)) | |
{ | |
remoteFrames = ReadAllFrames(stream); | |
} | |
StorageFrameDecoded[] localFrames; | |
var localChunkName = Path.Combine(localPath, blobName); | |
using (var stream = File.OpenRead(localChunkName)) | |
{ | |
localFrames = ReadAllFrames(stream); | |
} | |
// Compare local and remote from beginning | |
var equalPairs = remoteFrames | |
.Zip(localFrames, (r, l) => new {r, l}) | |
.TakeWhile(p => p.r.IsEmpty == p.l.IsEmpty | |
|| (p.r.Name == p.l.Name && p.r.Stamp == p.l.Stamp && p.r.Bytes.SequenceEqual(p.l.Bytes))) | |
.Count(); | |
// Exit if streams are equal | |
if (equalPairs == remoteFrames.Length && equalPairs == localFrames.Length) | |
return; | |
if (equalPairs < remoteFrames.Length && equalPairs < localFrames.Length) | |
throw new InvalidOperationException(string.Format( | |
"Remote and local have different branches. Common parent is \"{0}\"", | |
remoteFrames[equalPairs - 1].Name)); | |
if (equalPairs < localFrames.Length) | |
throw new InvalidOperationException(string.Format( | |
"Local has more events that remote. Last remote is \"{0}\"", | |
remoteFrames[equalPairs - 1].Name)); | |
File.Delete(localChunkName); | |
File.WriteAllBytes(localChunkName, remoteBytes); | |
} | |
static StorageFrameDecoded[] ReadAllFrames(Stream stream) | |
{ | |
var list = new List<StorageFrameDecoded>(); | |
StorageFrameDecoded result; | |
while (StorageFramesEvil.TryReadFrame(stream, out result)) | |
{ | |
list.Add(result); | |
} | |
return list.ToArray(); | |
} | |
static byte[] ReadWithRetry(IBlobReader blobReader, string name) | |
{ | |
var tryCount = 0; | |
while (true) | |
{ | |
try | |
{ | |
tryCount++; | |
return blobReader.ReadAllBytes(name); | |
} | |
catch (Exception) | |
{ | |
if (tryCount == 4) | |
throw; | |
Thread.Sleep(500); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class PrintMessagesProcessor : ICommandProcessor | |
{ | |
public string Key { get { return "PRINT"; } } | |
public string Usage { get { return "PRINT <folderNameInLokadData> (<regexp> | '<regexp>' | \"<regexp>\") <regexp2> ..."; } } | |
public bool Execute(CommandProcessorContext context, CancellationToken token, string[] args) | |
{ | |
if (args == null || args.Length < 2) | |
{ | |
context.Log.Error("No local folder provided."); | |
return false; | |
} | |
var localSubPath = args.Length > 1 ? args[0] : "s2-store"; | |
var localPath = Path.Combine(@"C:\LokadData", localSubPath, Setup.TapesContainer); | |
if (!Directory.Exists(localPath)) | |
{ | |
context.Log.Error(string.Format("ERROR: Directory \"{0}\" not found.", localPath)); | |
return false; | |
} | |
var patterns = args.Skip(1).ToArray(); | |
// unquote | |
for (var i = 0; i < patterns.Length; i++) | |
{ | |
if (patterns[i].Length > 2 && ( | |
(patterns[i].StartsWith("\'") && patterns[i].EndsWith("\'")) || | |
(patterns[i].StartsWith("\"") && patterns[i].EndsWith("\"")))) | |
patterns[i] = patterns[i].Substring(1, patterns[i].Length - 2); | |
} | |
Regex[] res; | |
try | |
{ | |
res = patterns.Select(pattern => new Regex(pattern)).ToArray(); | |
} | |
catch (ArgumentException e) | |
{ | |
context.Log.Error(string.Format("ERROR: Expression is not valid. {0}", e)); | |
return false; | |
} | |
using (var store = new FileAppendOnlyStore(new DirectoryInfo(localPath))) | |
{ | |
store.Initialize(); | |
var messageStore = new MessageStore(store, Contracts.CreateStreamer().MessageSerializer); | |
var origDateHandler = JsConfig.DateHandler; | |
JsConfig.DateHandler = JsonDateHandler.ISO8601; | |
foreach (var record in messageStore.EnumerateAllItems(0, int.MaxValue)) | |
{ | |
foreach (var item in record.Items) | |
{ | |
var sb = new StringBuilder(); | |
var typeName = item.GetType().Name; | |
var json = JsonSerializer.SerializeToString(item); | |
sb.AppendFormat("{0} {1}", typeName, json); | |
var message = sb.ToString(); | |
var allMatch = true; | |
foreach (var re in res) | |
{ | |
if (re.IsMatch(message)) | |
continue; | |
allMatch = false; | |
break; | |
} | |
if (allMatch) | |
{ | |
sb.Clear(); | |
sb.AppendLine(typeName); | |
sb.AppendLine(JsvFormatter.Format(json)); | |
context.Log.Info(sb.ToString()); | |
} | |
} | |
} | |
JsConfig.DateHandler = origDateHandler; | |
} | |
return true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment