Skip to content

Instantly share code, notes, and snippets.

@farism
Created February 21, 2017 20:05
Show Gist options
  • Save farism/465a2344c7e1ab97f68bddf8fb63f676 to your computer and use it in GitHub Desktop.
Save farism/465a2344c7e1ab97f68bddf8fb63f676 to your computer and use it in GitHub Desktop.
phoenix driver
import {Channel, Presence, Socket} from 'phoenix'
import xs, {MemoryStream, Stream} from 'xstream'
interface Options {
params?: object;
}
interface SocketMessage {
event: string;
payload: object;
ref?: string;
topic: string;
}
interface PresenceUserMeta {
name: string;
online_at: number;
phx_ref: string;
}
interface PresenceUser {
metas: PresenceUserMeta[];
}
interface PresenceMap {
[key: string]: PresenceUser;
}
class PhoenixSource {
private chans: {[key: string]: Channel}
private sock: Socket
constructor(url: string, opts: Options) {
this.chans = {}
this.sock = new Socket(url, opts)
this.sock.connect()
}
public socket = (): Stream<SocketMessage> => {
return xs.create<SocketMessage>({
start: listener => {
this.sock.onMessage(message => {
listener.next(message)
})
},
stop: () => {},
})
}
public channels = (topic: string): Stream<SocketMessage> => {
if (!this.chans[topic]) {
this.chans[topic] = this.sock.channel(topic)
this.chans[topic].join()
}
return this.socket().filter(message => message.topic === topic)
}
public messages = (topic: string): MemoryStream<SocketMessage[]> => {
return this.channels(topic)
.filter(message => message.event === 'message')
.fold((acc, message): SocketMessage[] => [...acc, message], [])
}
public presences = (topic: string): MemoryStream<PresenceMap> => {
return this.channels(topic)
.filter(message => message.event === 'presence_diff')
.fold((acc, message): PresenceMap => Presence.syncDiff(acc, message.payload), {})
.map(presences => Presence.list(presences, (id, {metas: [first]}) => ({...first, id})))
}
}
export default function makePhoenixDriver(url: string, opts: Options) {
return function phoenixDriver(outgoing$) {
const source = new PhoenixSource(url, opts)
outgoing$.addListener({
next: (ac) => {
// do something with source
},
error: () => {},
complete: () => {},
})
return source
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment