Created May 20, 2014 12:14
EventStore cluster in a F# script !
This gist runs a 3 in memory node EventStore cluster
and a client that send messages to the cluster
To use the web interface, copy EventStore web folders.
You can found them in the server distribution at
Then connect to http://localhost:2113/
You can view the cluster status at:
if you don't run it as an admin enable http listening for the user by typing in an admin console:
netsh http add urlacl url=http://localhost:2113/ user=domain\username
netsh http add urlacl url= user=domain\username
netsh http add urlacl url=http://localhost:2112/ user=domain\username
netsh http add urlacl url= user=domain\username
netsh http add urlacl url=http://localhost:2115/ user=domain\username
netsh http add urlacl url= user=domain\username
netsh http add urlacl url=http://localhost:2114/ user=domain\username
netsh http add urlacl url= user=domain\username
netsh http add urlacl url=http://localhost:2117/ user=domain\username
netsh http add urlacl url= user=domain\username
netsh http add urlacl url=http://localhost:2116/ user=domain\username
netsh http add urlacl url= user=domain\username
Have fun !
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Common.dll"
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Core.dll"
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Web.dll"
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Projections.Core.dll"
// Add Task and Task<T> handling to async {} computation expressions
module TaskAsync =
open System
open System.Threading.Tasks
type Microsoft.FSharp.Control.Async with
static member AsyncRaise (e : #exn) =
Async.FromContinuations(fun (_,econt,_) -> econt e)
static member AwaitTask (t : Task) =
let flattenExns (e : AggregateException) = e.Flatten().InnerExceptions |> Seq.nth 0
let rewrapAsyncExn (it : Async<unit>) =
async { try do! it with :? AggregateException as ae -> do! Async.AsyncRaise(flattenExns ae) }
let tcs = new TaskCompletionSource<unit>(TaskCreationOptions.None)
t.ContinueWith((fun t' ->
if t.IsFaulted then tcs.SetException(t.Exception |> flattenExns)
elif t.IsCanceled then tcs.SetCanceled ()
else tcs.SetResult(())), TaskContinuationOptions.ExecuteSynchronously)
|> ignore
tcs.Task |> Async.AwaitTask |> rewrapAsyncExn
type Microsoft.FSharp.Control.AsyncBuilder with
member this.Bind(t: Task<'a>, f: 'a -> Async<'b>) : Async<'b> = this.Bind(Async.AwaitTask t, f)
member this.Bind(t: Task, f: unit -> Async<'b>) : Async<'b> = this.Bind(Async.AwaitTask t, f)
open System
open System.Net
open EventStore.Core
open EventStore.Core.Cluster.Settings
open EventStore.Core.TransactionLog.FileNamingStrategy
open EventStore.Core.TransactionLog.Chunks
open EventStore.Core.TransactionLog.Checkpoint
open EventStore.Core.Services.Gossip
open EventStore.Core.Util
open EventStore.Core.Services.Monitoring
open EventStore.Core.Authentication
// create a in memory cluster node
// needs more config IRL
let node n =
let localhost port = IPEndPoint(IPAddress.Parse(""), port)
let lh port = localhost (port + n * 2)
let config =
let dbPath = @"not needed in memory"
TFChunkDbConfig(dbPath, new VersionedPatternFileNamingStrategy(dbPath, "chunk-"),
TFConsts.ChunkSize, 0L,
new InMemoryCheckpoint(Checkpoint.Writer),
new InMemoryCheckpoint(Checkpoint.Chaser),
new InMemoryCheckpoint(Checkpoint.Epoch, initValue = -1L),
new InMemoryCheckpoint(Checkpoint.Truncate, initValue= -1L),
true )
let db = new TFChunkDb(config)
let gossipSeeds = [| localhost 2112; localhost 2114; localhost 2116 |]
let gossipSeedSource = new KnownEndpointGossipSeedSource(gossipSeeds)
let nodeSettings =
lh 1112, null, lh 1113, null, lh 2112, lh 2113,
[| sprintf "http://localhost:%d/" (2113+2*n);sprintf "" (2113+2*n) |], false, null, Opts.WorkerThreadsDefault, false, "fake.dns", gossipSeeds,
TimeSpan.FromMilliseconds Opts.MinFlushDelayMsDefault,
TimeSpan.FromMilliseconds (float Opts.PrepareTimeoutMsDefault),
TimeSpan.FromMilliseconds (float Opts.CommitTimeoutMsDefault),
false, null, false,
TimeSpan.FromSeconds(120.0), StatsStorage.Csv,
Opts.NodePriorityDefault, InternalAuthenticationProviderFactory(), Opts.DisableScavengeMergeDefault,
Opts.AdminOnExtDefault, Opts.StatsOnExtDefault, true (*Opts.GossipOnExtDefault*),
TimeSpan.FromMilliseconds (float Opts.GossipIntervalMsDefault),
TimeSpan.FromMilliseconds (float Opts.GossipAllowedDifferenceMsDefault),
TimeSpan.FromMilliseconds (float Opts.GossipTimeoutMsDefault))
let projections = new EventStore.Projections.Core.ProjectionsSubsystem(Opts.ProjectionThreadsDefault, EventStore.Common.Options.ProjectionType.All)
let node = new ClusterVNode(db, nodeSettings, gossipSeedSource, false, Opts.MaxMemtableSizeDefault, projections);
node.InternalHttpService.SetupController(Services.Transport.Http.Controllers.ClusterWebUIController(node.MainQueue, [| NodeSubsystems.Projections|]))
node.ExternalHttpService.SetupController(Services.Transport.Http.Controllers.ClusterWebUIController(node.MainQueue, [| NodeSubsystems.Projections|]))
node.MainBus.Subscribe(new EventStore.Web.Users.UserManagementProjectionsRegistration())
// start 3 nodes
let node0 = node 0
let node1 = node 1
let node2 = node 2
// simple client for the cluster
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.ClientAPI.dll"
open EventStore.ClientAPI
async {
let cnxSettings =
|> ConnectionSettingsBuilder.op_Implicit
let settings =
let lh port = IPEndPoint( IPAddress.Parse(""), port)
.DiscoverClusterViaGossipSeeds().SetGossipSeedEndPoints(lh 2113, lh 2115, lh 2117)
|> GossipSeedClusterSettingsBuilder.op_Implicit
use store = EventStoreConnection.Create( cnxSettings, settings, "")
do! store.ConnectAsync()
printfn "Connected"
let rec loop() =
let msg = Console.ReadLine()
let event = EventData(Guid.NewGuid(), "SomethingHappened", true, Text.Encoding.UTF8.GetBytes(sprintf "{Text:\"%s\"}" msg), null)
let! (result: WriteResult) = store.AppendToStreamAsync("Messages", ExpectedVersion.Any, event)
printfn "Written. Next: %d" result.NextExpectedVersion
return! loop() }
return! loop()
} |> Async.Start
Console.CancelKeyPress.Add <| fun e -> Async.CancelDefaultToken()
printf "Exit"
