Skip to content

Instantly share code, notes, and snippets.

@vshapenko
Last active March 28, 2019 13:10
Show Gist options
  • Save vshapenko/4b0b912d521ba7ac008f0d013432f807 to your computer and use it in GitHub Desktop.
Save vshapenko/4b0b912d521ba7ac008f0d013432f807 to your computer and use it in GitHub Desktop.
module MessageProcessing=
open System.IO
type private DataMsg=
|Raw of byte[]
let empty (msg:ArraySegment<byte>)=
let length=readInt msg.Offset msg.Array|>fst
let stream=new MemoryStream(length+4)
stream
let write (seg:ArraySegment<byte>) (stream:MemoryStream)=stream.Write(seg.Array,seg.Offset,seg.Count)
let private messageStore=MailboxProcessor.Start (fun inbox->
let rec loop (state:(MemoryStream) option)=async{
let! (Raw m)=inbox.Receive()
let segment=new ArraySegment<byte>(m)
let data=state|>Option.defaultWith (fun ()->empty segment)
trace <| sprintf "current data position=%i capacity=%i" data.Position data.Capacity
trace <| sprintf "new chunk size %i" segment.Count
let availSpace=data.Capacity-(int data.Position)
let count=Math.Min(segment.Count,availSpace)
let segToWrite=segment.Slice(0,count)
let nextSeg=segment.Slice(count)
write segToWrite data
if(data.Position=(data.Capacity|>int64)) then
parse (data.ToArray())|>Async.Start
data.Dispose()
if(nextSeg.Count=0) then
return! loop None
else
let stream=empty nextSeg
write nextSeg stream
return! loop (Some stream)
}
loop None
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment