Skip to content

Instantly share code, notes, and snippets.

@ruslander
Last active December 28, 2015 21:29
Show Gist options
  • Select an option

  • Save ruslander/1e4982769c8eb753cc27 to your computer and use it in GitHub Desktop.

Select an option

Save ruslander/1e4982769c8eb753cc27 to your computer and use it in GitHub Desktop.
AggregateTests mongodb
using System;
using System.Configuration;
using System.Linq;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using MongoDB.Driver.Builders;
using MongoDB.Driver.Linq;
using NUnit.Framework;
namespace Collector.Tests
{
[TestFixture]
public class AggregateTests
{
private MongoDatabase _db;
private Repository<Agg> _repo;
[SetUp]
public void Setup()
{
var connectionString = ConfigurationManager.AppSettings["MongoDB"];
var databaseName = MongoUrl.Create(connectionString).DatabaseName;
var client = new MongoClient(connectionString);
var server = client.GetServer();
_db = server.GetDatabase(databaseName);
_db.DropCollection("Agg");
_repo = new Repository<Agg>(_db);
}
public void DbHasIn(string collection, string json)
{
var bson = BsonDocument.Parse(json);
_db.GetCollection(collection).Insert(bson);
}
[Test]
public void Save_will_insert_new_aggregate_with_version_1_Test()
{
_repo.Save(new Agg() { Data = new Agg.AggState() { Id = 100 } });
var sut = _db.GetCollection<Agg.AggState>(typeof (Agg).Name).AsQueryable().First();
Assert.AreEqual(1, sut.Version);
Assert.AreEqual(100, sut.Id);
}
[Test]
public void Save_will_update_aggregate_and_bump_version_2_Test()
{
DbHasIn("Agg", @"
{
""_id"" : NumberInt(100),
""_t"" : ""AggState"",
""Version"" : NumberInt(1)
}");
_repo.Save(new Agg() { Data = new Agg.AggState() { Id = 100 , Version = 1} });
Assert.AreEqual(2, _db.GetCollection<Agg.AggState>(typeof(Agg).Name).AsQueryable().First().Version);
}
[Test]
public void Save_will_fail_on_stale_state_Test()
{
DbHasIn("Agg", @"
{
""_id"" : NumberInt(100),
""_t"" : ""AggState"",
""Version"" : NumberInt(10)
}");
Assert.Throws<AggregateConcurrentUpdateException>(() =>
{
_repo.Save(new Agg() { Data = new Agg.AggState() { Id = 100, Version = 7 } });
});
}
[Test]
public void GetById_gets_a_record_by_id_Test()
{
DbHasIn("Agg",@"
{
""_id"" : NumberInt(100),
""_t"" : ""AggState"",
""Version"" : NumberInt(1)
}");
var agg = _repo.GetById(100);
Assert.AreEqual(1, agg.GetSnapshot().Version);
Assert.AreEqual(100, agg.GetSnapshot().Id);
}
[Test]
public void GetById_gets_null_for_nonexisting_id_Test()
{
DbHasIn("Agg", @"
{
""_id"" : NumberInt(100),
""_t"" : ""AggState"",
""Version"" : NumberInt(1)
}");
var agg = _repo.GetById(1000001);
Assert.IsNull(agg);
}
[Test]
public void AggregateConcurrentUpdateExceptionTest()
{
Assert.Throws<AggregateConcurrentUpdateException>(() =>
{
_repo.Save(new Agg() { Data = new Agg.AggState() { Id = 1000 } });
_repo.Save(new Agg() { Data = new Agg.AggState() { Id = 1000 } });
});
}
}
public class Repository<T> where T : IAggregateRootSnapshot
{
private readonly MongoDatabase _db;
public Repository(MongoDatabase db)
{
_db = db;
_db
.GetCollection<BsonDocument>(typeof(T).Name)
.CreateIndex(new IndexKeysBuilder().Ascending("_id"), new IndexOptionsBuilder().SetUnique(true));
}
public void Save(T agg)
{
var snapshot = agg.GetSnapshot();
var collection = _db.GetCollection(agg.GetType().Name);
if (snapshot.Version == 0)
{
try
{
snapshot.Version = 1;
collection.Insert(snapshot.ToBsonDocument());
}
catch (MongoDuplicateKeyException e)
{
throw new AggregateConcurrentUpdateException("Optimistic concurrency conflict occurred at " + agg.GetType().Name + "#" + snapshot.Id + " v" + snapshot.Version);
}
}
else
{
var query = Query.And(Query.EQ("_id", snapshot.Id), Query.EQ("Version", snapshot.Version));
var bsonDoc = snapshot.ToBsonDocument();
var update = new UpdateBuilder().Inc("Version", 1);
foreach (var field in bsonDoc.Where(field => field.Name != "Version" && field.Name != "_id"))
{
update.Set(field.Name, field.Value);
}
var modifyResult = collection.FindAndModify(new FindAndModifyArgs
{
Query = query,
Update = update,
SortBy = SortBy.Null,
VersionReturned = FindAndModifyDocumentVersion.Modified,
Upsert = false
});
if (modifyResult.ModifiedDocument == null)
throw new AggregateConcurrentUpdateException("Optimistic concurrency conflict occurred at " + agg.GetType().Name + "#" + snapshot.Id + " v" + snapshot.Version);
}
}
public T GetById(int id)
{
var coll = _db.GetCollection(typeof(T).Name);
var state = coll.Find(Query.EQ("_id", id)).FirstOrDefault();
if (state == null)
return default(T);
var agg = (T) Activator.CreateInstance(typeof (T));
agg.LoadSnapshot(BsonSerializer.Deserialize<Agg.AggState>(state));
return agg;
}
}
public class AggregateConcurrentUpdateException : Exception
{
public AggregateConcurrentUpdateException(string message) : base(message) {}
}
public interface IAggregateRootSnapshot
{
IAggregateMemento GetSnapshot();
void LoadSnapshot(object snapshot);
}
public abstract class AggregateRoot<TSatate> : IAggregateRootSnapshot
{
public TSatate Data { get; set; }
public IAggregateMemento GetSnapshot()
{
return (IAggregateMemento)Data;
}
public void LoadSnapshot(object snapshot)
{
Data = (TSatate)snapshot;
}
}
public interface IAggregateMemento
{
int Id { get; }
int Version { get; set; }
}
public class Agg : AggregateRoot<Agg.AggState>
{
public class AggState : IAggregateMemento
{
public int Id { get; internal set; }
public int Version { get; set; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment