Created
May 20, 2014 12:14
-
-
Save thinkbeforecoding/2279b1a5cb873254587e to your computer and use it in GitHub Desktop.
EventStore cluster in a F# script !
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(* | |
This gist runs a 3 in memory node EventStore cluster | |
and a client that send messages to the cluster | |
To use the web interface, copy EventStore web folders. | |
You can found them in the server distribution at http://geteventstore.com/downloads/. | |
Then connect to http://localhost:2113/ | |
You can view the cluster status at: | |
http://localhost:2113/web/gossip.htm | |
if you don't run it as an admin enable http listening for the user by typing in an admin console: | |
netsh http add urlacl url=http://localhost:2113/ user=domain\username | |
netsh http add urlacl url=http://127.0.0.1:2113/ user=domain\username | |
netsh http add urlacl url=http://localhost:2112/ user=domain\username | |
netsh http add urlacl url=http://127.0.0.1:2112/ user=domain\username | |
netsh http add urlacl url=http://localhost:2115/ user=domain\username | |
netsh http add urlacl url=http://127.0.0.1:2115/ user=domain\username | |
netsh http add urlacl url=http://localhost:2114/ user=domain\username | |
netsh http add urlacl url=http://127.0.0.1:2114/ user=domain\username | |
netsh http add urlacl url=http://localhost:2117/ user=domain\username | |
netsh http add urlacl url=http://127.0.0.1:2117/ user=domain\username | |
netsh http add urlacl url=http://localhost:2116/ user=domain\username | |
netsh http add urlacl url=http://127.0.0.1:2116/ user=domain\username | |
Have fun ! | |
*) | |
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Common.dll" | |
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Core.dll" | |
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Web.dll" | |
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Projections.Core.dll" | |
// Add Task and Task<T> handling to async {} computation expressions | |
[<AutoOpen>] | |
module TaskAsync = | |
open System | |
open System.Threading.Tasks | |
type Microsoft.FSharp.Control.Async with | |
static member AsyncRaise (e : #exn) = | |
Async.FromContinuations(fun (_,econt,_) -> econt e) | |
static member AwaitTask (t : Task) = | |
let flattenExns (e : AggregateException) = e.Flatten().InnerExceptions |> Seq.nth 0 | |
let rewrapAsyncExn (it : Async<unit>) = | |
async { try do! it with :? AggregateException as ae -> do! Async.AsyncRaise(flattenExns ae) } | |
let tcs = new TaskCompletionSource<unit>(TaskCreationOptions.None) | |
t.ContinueWith((fun t' -> | |
if t.IsFaulted then tcs.SetException(t.Exception |> flattenExns) | |
elif t.IsCanceled then tcs.SetCanceled () | |
else tcs.SetResult(())), TaskContinuationOptions.ExecuteSynchronously) | |
|> ignore | |
tcs.Task |> Async.AwaitTask |> rewrapAsyncExn | |
type Microsoft.FSharp.Control.AsyncBuilder with | |
member this.Bind(t: Task<'a>, f: 'a -> Async<'b>) : Async<'b> = this.Bind(Async.AwaitTask t, f) | |
member this.Bind(t: Task, f: unit -> Async<'b>) : Async<'b> = this.Bind(Async.AwaitTask t, f) | |
open System | |
open System.Net | |
open EventStore.Core | |
open EventStore.Core.Cluster.Settings | |
open EventStore.Core.TransactionLog.FileNamingStrategy | |
open EventStore.Core.TransactionLog.Chunks | |
open EventStore.Core.TransactionLog.Checkpoint | |
open EventStore.Core.Services.Gossip | |
open EventStore.Core.Util | |
open EventStore.Core.Services.Monitoring | |
open EventStore.Core.Authentication | |
// create a in memory cluster node | |
// needs more config IRL | |
let node n = | |
let localhost port = IPEndPoint(IPAddress.Parse("127.0.0.1"), port) | |
let lh port = localhost (port + n * 2) | |
let config = | |
let dbPath = @"not needed in memory" | |
TFChunkDbConfig(dbPath, new VersionedPatternFileNamingStrategy(dbPath, "chunk-"), | |
TFConsts.ChunkSize, 0L, | |
new InMemoryCheckpoint(Checkpoint.Writer), | |
new InMemoryCheckpoint(Checkpoint.Chaser), | |
new InMemoryCheckpoint(Checkpoint.Epoch, initValue = -1L), | |
new InMemoryCheckpoint(Checkpoint.Truncate, initValue= -1L), | |
true ) | |
let db = new TFChunkDb(config) | |
let gossipSeeds = [| localhost 2112; localhost 2114; localhost 2116 |] | |
let gossipSeedSource = new KnownEndpointGossipSeedSource(gossipSeeds) | |
let nodeSettings = | |
ClusterVNodeSettings( | |
Guid.NewGuid(), | |
0, | |
lh 1112, null, lh 1113, null, lh 2112, lh 2113, | |
[| sprintf "http://localhost:%d/" (2113+2*n);sprintf "http://127.0.0.1:%d/" (2113+2*n) |], false, null, Opts.WorkerThreadsDefault, false, "fake.dns", gossipSeeds, | |
TimeSpan.FromMilliseconds Opts.MinFlushDelayMsDefault, | |
Opts.ClusterSizeDefault, | |
Opts.PrepareCountDefault, | |
Opts.CommitCountDefault, | |
TimeSpan.FromMilliseconds (float Opts.PrepareTimeoutMsDefault), | |
TimeSpan.FromMilliseconds (float Opts.CommitTimeoutMsDefault), | |
false, null, false, | |
TimeSpan.FromSeconds(120.0), StatsStorage.Csv, | |
Opts.NodePriorityDefault, InternalAuthenticationProviderFactory(), Opts.DisableScavengeMergeDefault, | |
Opts.AdminOnExtDefault, Opts.StatsOnExtDefault, true (*Opts.GossipOnExtDefault*), | |
TimeSpan.FromMilliseconds (float Opts.GossipIntervalMsDefault), | |
TimeSpan.FromMilliseconds (float Opts.GossipAllowedDifferenceMsDefault), | |
TimeSpan.FromMilliseconds (float Opts.GossipTimeoutMsDefault)) | |
let projections = new EventStore.Projections.Core.ProjectionsSubsystem(Opts.ProjectionThreadsDefault, EventStore.Common.Options.ProjectionType.All) | |
let node = new ClusterVNode(db, nodeSettings, gossipSeedSource, false, Opts.MaxMemtableSizeDefault, projections); | |
node.InternalHttpService.SetupController(Services.Transport.Http.Controllers.ClusterWebUIController(node.MainQueue, [| NodeSubsystems.Projections|])) | |
node.InternalHttpService.SetupController(EventStore.Web.Users.UsersWebController(node.MainQueue)) | |
node.ExternalHttpService.SetupController(Services.Transport.Http.Controllers.ClusterWebUIController(node.MainQueue, [| NodeSubsystems.Projections|])) | |
node.ExternalHttpService.SetupController(EventStore.Web.Users.UsersWebController(node.MainQueue)) | |
node.MainBus.Subscribe(new EventStore.Web.Users.UserManagementProjectionsRegistration()) | |
node.Start() | |
node | |
// start 3 nodes | |
let node0 = node 0 | |
let node1 = node 1 | |
let node2 = node 2 | |
// simple client for the cluster | |
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.ClientAPI.dll" | |
open EventStore.ClientAPI | |
async { | |
let cnxSettings = | |
ConnectionSettings.Create() | |
.SetDefaultUserCredentials(SystemData.UserCredentials("admin","changeit")) | |
.UseConsoleLogger() | |
|> ConnectionSettingsBuilder.op_Implicit | |
let settings = | |
let lh port = IPEndPoint( IPAddress.Parse("127.0.0.1"), port) | |
ClusterSettings.Create() | |
.DiscoverClusterViaGossipSeeds().SetGossipSeedEndPoints(lh 2113, lh 2115, lh 2117) | |
|> GossipSeedClusterSettingsBuilder.op_Implicit | |
use store = EventStoreConnection.Create( cnxSettings, settings, "") | |
do! store.ConnectAsync() | |
printfn "Connected" | |
let rec loop() = | |
async{ | |
let msg = Console.ReadLine() | |
let event = EventData(Guid.NewGuid(), "SomethingHappened", true, Text.Encoding.UTF8.GetBytes(sprintf "{Text:\"%s\"}" msg), null) | |
let! (result: WriteResult) = store.AppendToStreamAsync("Messages", ExpectedVersion.Any, event) | |
printfn "Written. Next: %d" result.NextExpectedVersion | |
return! loop() } | |
return! loop() | |
} |> Async.Start | |
Console.CancelKeyPress.Add <| fun e -> Async.CancelDefaultToken() | |
Async.DefaultCancellationToken.WaitHandle.WaitOne() | |
node0.Stop() | |
node1.Stop() | |
node2.Stop() | |
printf "Exit" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment