Skip to content

Instantly share code, notes, and snippets.

@thinkbeforecoding
Created May 20, 2014 12:14
Show Gist options
  • Save thinkbeforecoding/2279b1a5cb873254587e to your computer and use it in GitHub Desktop.
Save thinkbeforecoding/2279b1a5cb873254587e to your computer and use it in GitHub Desktop.
EventStore cluster in a F# script !
(*
This gist runs a 3 in memory node EventStore cluster
and a client that send messages to the cluster
To use the web interface, copy EventStore web folders.
You can found them in the server distribution at http://geteventstore.com/downloads/.
Then connect to http://localhost:2113/
You can view the cluster status at:
http://localhost:2113/web/gossip.htm
if you don't run it as an admin enable http listening for the user by typing in an admin console:
netsh http add urlacl url=http://localhost:2113/ user=domain\username
netsh http add urlacl url=http://127.0.0.1:2113/ user=domain\username
netsh http add urlacl url=http://localhost:2112/ user=domain\username
netsh http add urlacl url=http://127.0.0.1:2112/ user=domain\username
netsh http add urlacl url=http://localhost:2115/ user=domain\username
netsh http add urlacl url=http://127.0.0.1:2115/ user=domain\username
netsh http add urlacl url=http://localhost:2114/ user=domain\username
netsh http add urlacl url=http://127.0.0.1:2114/ user=domain\username
netsh http add urlacl url=http://localhost:2117/ user=domain\username
netsh http add urlacl url=http://127.0.0.1:2117/ user=domain\username
netsh http add urlacl url=http://localhost:2116/ user=domain\username
netsh http add urlacl url=http://127.0.0.1:2116/ user=domain\username
Have fun !
*)
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Common.dll"
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Core.dll"
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Web.dll"
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.Projections.Core.dll"
// Add Task and Task<T> handling to async {} computation expressions
[<AutoOpen>]
module TaskAsync =
open System
open System.Threading.Tasks
type Microsoft.FSharp.Control.Async with
static member AsyncRaise (e : #exn) =
Async.FromContinuations(fun (_,econt,_) -> econt e)
static member AwaitTask (t : Task) =
let flattenExns (e : AggregateException) = e.Flatten().InnerExceptions |> Seq.nth 0
let rewrapAsyncExn (it : Async<unit>) =
async { try do! it with :? AggregateException as ae -> do! Async.AsyncRaise(flattenExns ae) }
let tcs = new TaskCompletionSource<unit>(TaskCreationOptions.None)
t.ContinueWith((fun t' ->
if t.IsFaulted then tcs.SetException(t.Exception |> flattenExns)
elif t.IsCanceled then tcs.SetCanceled ()
else tcs.SetResult(())), TaskContinuationOptions.ExecuteSynchronously)
|> ignore
tcs.Task |> Async.AwaitTask |> rewrapAsyncExn
type Microsoft.FSharp.Control.AsyncBuilder with
member this.Bind(t: Task<'a>, f: 'a -> Async<'b>) : Async<'b> = this.Bind(Async.AwaitTask t, f)
member this.Bind(t: Task, f: unit -> Async<'b>) : Async<'b> = this.Bind(Async.AwaitTask t, f)
open System
open System.Net
open EventStore.Core
open EventStore.Core.Cluster.Settings
open EventStore.Core.TransactionLog.FileNamingStrategy
open EventStore.Core.TransactionLog.Chunks
open EventStore.Core.TransactionLog.Checkpoint
open EventStore.Core.Services.Gossip
open EventStore.Core.Util
open EventStore.Core.Services.Monitoring
open EventStore.Core.Authentication
// create a in memory cluster node
// needs more config IRL
let node n =
let localhost port = IPEndPoint(IPAddress.Parse("127.0.0.1"), port)
let lh port = localhost (port + n * 2)
let config =
let dbPath = @"not needed in memory"
TFChunkDbConfig(dbPath, new VersionedPatternFileNamingStrategy(dbPath, "chunk-"),
TFConsts.ChunkSize, 0L,
new InMemoryCheckpoint(Checkpoint.Writer),
new InMemoryCheckpoint(Checkpoint.Chaser),
new InMemoryCheckpoint(Checkpoint.Epoch, initValue = -1L),
new InMemoryCheckpoint(Checkpoint.Truncate, initValue= -1L),
true )
let db = new TFChunkDb(config)
let gossipSeeds = [| localhost 2112; localhost 2114; localhost 2116 |]
let gossipSeedSource = new KnownEndpointGossipSeedSource(gossipSeeds)
let nodeSettings =
ClusterVNodeSettings(
Guid.NewGuid(),
0,
lh 1112, null, lh 1113, null, lh 2112, lh 2113,
[| sprintf "http://localhost:%d/" (2113+2*n);sprintf "http://127.0.0.1:%d/" (2113+2*n) |], false, null, Opts.WorkerThreadsDefault, false, "fake.dns", gossipSeeds,
TimeSpan.FromMilliseconds Opts.MinFlushDelayMsDefault,
Opts.ClusterSizeDefault,
Opts.PrepareCountDefault,
Opts.CommitCountDefault,
TimeSpan.FromMilliseconds (float Opts.PrepareTimeoutMsDefault),
TimeSpan.FromMilliseconds (float Opts.CommitTimeoutMsDefault),
false, null, false,
TimeSpan.FromSeconds(120.0), StatsStorage.Csv,
Opts.NodePriorityDefault, InternalAuthenticationProviderFactory(), Opts.DisableScavengeMergeDefault,
Opts.AdminOnExtDefault, Opts.StatsOnExtDefault, true (*Opts.GossipOnExtDefault*),
TimeSpan.FromMilliseconds (float Opts.GossipIntervalMsDefault),
TimeSpan.FromMilliseconds (float Opts.GossipAllowedDifferenceMsDefault),
TimeSpan.FromMilliseconds (float Opts.GossipTimeoutMsDefault))
let projections = new EventStore.Projections.Core.ProjectionsSubsystem(Opts.ProjectionThreadsDefault, EventStore.Common.Options.ProjectionType.All)
let node = new ClusterVNode(db, nodeSettings, gossipSeedSource, false, Opts.MaxMemtableSizeDefault, projections);
node.InternalHttpService.SetupController(Services.Transport.Http.Controllers.ClusterWebUIController(node.MainQueue, [| NodeSubsystems.Projections|]))
node.InternalHttpService.SetupController(EventStore.Web.Users.UsersWebController(node.MainQueue))
node.ExternalHttpService.SetupController(Services.Transport.Http.Controllers.ClusterWebUIController(node.MainQueue, [| NodeSubsystems.Projections|]))
node.ExternalHttpService.SetupController(EventStore.Web.Users.UsersWebController(node.MainQueue))
node.MainBus.Subscribe(new EventStore.Web.Users.UserManagementProjectionsRegistration())
node.Start()
node
// start 3 nodes
let node0 = node 0
let node1 = node 1
let node2 = node 2
// simple client for the cluster
#r @"C:\Development\GitHub\EventStore\bin\eventstore\release\anycpu\EventStore.ClientAPI.dll"
open EventStore.ClientAPI
async {
let cnxSettings =
ConnectionSettings.Create()
.SetDefaultUserCredentials(SystemData.UserCredentials("admin","changeit"))
.UseConsoleLogger()
|> ConnectionSettingsBuilder.op_Implicit
let settings =
let lh port = IPEndPoint( IPAddress.Parse("127.0.0.1"), port)
ClusterSettings.Create()
.DiscoverClusterViaGossipSeeds().SetGossipSeedEndPoints(lh 2113, lh 2115, lh 2117)
|> GossipSeedClusterSettingsBuilder.op_Implicit
use store = EventStoreConnection.Create( cnxSettings, settings, "")
do! store.ConnectAsync()
printfn "Connected"
let rec loop() =
async{
let msg = Console.ReadLine()
let event = EventData(Guid.NewGuid(), "SomethingHappened", true, Text.Encoding.UTF8.GetBytes(sprintf "{Text:\"%s\"}" msg), null)
let! (result: WriteResult) = store.AppendToStreamAsync("Messages", ExpectedVersion.Any, event)
printfn "Written. Next: %d" result.NextExpectedVersion
return! loop() }
return! loop()
} |> Async.Start
Console.CancelKeyPress.Add <| fun e -> Async.CancelDefaultToken()
Async.DefaultCancellationToken.WaitHandle.WaitOne()
node0.Stop()
node1.Stop()
node2.Stop()
printf "Exit"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment