Created
March 10, 2018 13:12
-
-
Save RashadSaleh/c7405f530f90bd9a96b96c7cfeeabd2a to your computer and use it in GitHub Desktop.
deepstream with GraphQL subscriptions and permissioning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SYNTACTIC SUGAR ON TOP OF deepstream CLIENT API | |
var deepstream = require('deepstream.io-client-js'); | |
String.prototype.hash = function() { | |
var hash = 0, i, chr; | |
if (this.length === 0) return hash; | |
for (i = 0; i < this.length; i++) { | |
chr = this.charCodeAt(i); | |
hash = ((hash << 5) - hash) + chr; | |
hash |= 0; // Convert to 32bit integer | |
} | |
return hash; | |
}; | |
var _ = require('lodash'); | |
module.exports = class Client { | |
constructor({host = 'localhost', port = 6020}) { | |
this.instance = deepstream(host+':'+port); | |
} | |
authenticate({username = null, password = null}) { | |
return new Promise((resolve, reject) => { | |
this.instance.login({username, password}, (success, data) => { | |
if (success) { | |
this.id = username; | |
resolve(); | |
} | |
else reject(); | |
}) | |
}); | |
} | |
subscribe_once({type, id, path, on_change}) { | |
return new Promise((resolve, reject) => { | |
this.instance.record.getRecord(type+'/'+id).whenReady((r)=> { | |
r.subscribe(path, async () => { | |
await on_change(); | |
r.discard(); | |
}, false); | |
resolve({ | |
unsubscribe: ()=>{ | |
r.discard(); | |
}, | |
data: r.get(path) | |
}); | |
}); | |
}); | |
} | |
subscribe({type, id, path, on_change}) { | |
return new Promise((resolve, reject) => { | |
this.instance.record.getRecord(type+'/'+id).whenReady((r)=> { | |
r.subscribe(path, async()=> {await on_change();}, false); | |
resolve({ | |
unsubscribe: ()=>{ | |
r.discard(); | |
}, | |
data: r.get(path) | |
}); | |
}); | |
}); | |
} | |
exists({type, id}) { | |
return new Promise((resolve) => { | |
this.instance.record.has(type+'/'+id, (error, result) => { | |
resolve(result); | |
}) | |
}) | |
} | |
request({service, payload = {}}) { | |
return new Promise((resolve, reject) => { | |
if (!payload.origin) delete payload.origin; // (so that "null" and "undefined" values of origin do not override the default value). | |
this.instance.rpc.make(service, { origin: {id: this.id}, ...payload}, (error, response) => { | |
if (error) { | |
reject(error); | |
} else { | |
resolve(response); | |
} | |
}) | |
}) | |
} | |
delete({type, id}) { | |
return this.request({service: 'record/delete', payload: {record: {type, id}, origin}}); | |
} | |
set({type, id}, {path, value}, origin) { | |
return this.request({service: 'record/set', payload: {record: {type, id}, path, value, origin}}); | |
} | |
create({type, id}, origin) { | |
return this.request({service: 'record/create', payload: {record: {type, id}, origin}}); | |
} | |
get_record({type, id}) { | |
return new Promise((resolve, reject) => { | |
this.instance.record.getRecord(type+'/'+id).whenReady((r) => { | |
resolve(r); | |
}); | |
}); | |
} | |
get({type, id, path}) { | |
return new Promise((resolve, reject)=>{ | |
this.instance.record.getRecord(type+'/'+id).whenReady((r)=> { | |
resolve(r.get(path)); | |
}); | |
}); | |
} | |
generate_new_unique_id() { | |
return this.instance.getUid(); | |
} | |
get Signal() { | |
let client = this; | |
return class { | |
constructor({name, to, data}) { | |
this.name = name; | |
this.to = to; | |
this.data = data; | |
client.instance.event.emit(this.to + '/' + this.name, {data: this.data, origin: client.id}); | |
} | |
} | |
} | |
get when() { | |
let registrar = (signal) => { | |
return (handler) => { | |
this.instance.event.subscribe('/'+ signal, handler); | |
return () => { | |
this.instance.event.unsubscribe('/'+ signal, handler); | |
} | |
} | |
} | |
registrar.private = (signal) => { | |
return (handler) => { | |
this.instance.event.subscribe(this.id + '/' + signal, handler); | |
return () => { | |
this.instance.event.unsubscribe(this.id + '/' + signal, handler); | |
} | |
} | |
} | |
return registrar; | |
} | |
async query(string, on_change) { | |
// THIS IS A VERY WEIRD WAY OF SUBSCRIBING TO QUERY RESULT UPDATES. | |
// BUT IT IS NOT NECESSARY; | |
// REPLACE subscriber WITH ANYTHING YOU LIKE INSTEAD. | |
let x = {}; | |
let data = await this.request({service: 'query', payload: {string}}); | |
for (key in x) delete x[key]; | |
_.assign(x, data); | |
let subscriber = (data) => { | |
for (let key in x) delete x[key]; | |
_.assign(x, data); | |
if (on_change) { | |
console.log('change happened'); | |
on_change(x, () => { | |
this.instance.event.unsubscribe(this.id+'/service/query/'+string.hash(), subscriber); | |
}); | |
} | |
} | |
this.instance.event.subscribe(this.id+'/service/query/'+string.hash(), subscriber); | |
return x; | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// GRAPHQL SCHEMA. DEPENDS ON types.js | |
// MOST INTERESTING PART IS AT THE BOTTOM, WHERE requery IS DEFINED AND USED TO KEEP THINGS UP TO DATE. | |
var { | |
GraphQLSchema, | |
GraphQLObjectType, | |
GraphQLString, | |
GraphQLBoolean, | |
graphql, | |
GraphQLInputObjectType | |
} = require('graphql'); | |
var { User, json, Part, Permission, Rule, Reference, Notification } = require('./types.js'); | |
let Record = new GraphQLInputObjectType ({ | |
name: 'record', | |
fields: { | |
type: { | |
name: 'type', | |
type: GraphQLString | |
}, | |
id: { | |
name: 'id', | |
type: GraphQLString | |
} | |
} | |
}); | |
function map_selections(selections) { | |
let map = {}; | |
for (let selection of selections) { | |
if (selection.selectionSet) { | |
map[selection.name.value] = map_selections(selection.selectionSet.selections); | |
} else { | |
map[selection.name.value] = true; | |
} | |
} | |
return map; | |
} | |
var schema = new GraphQLSchema({ | |
/* | |
mutation: new GraphQLObjectType({ | |
name: 'mutation', | |
fields: { | |
user: { | |
args: { | |
id: { | |
name: 'id', | |
type: GraphQLString | |
}, | |
sets: { | |
name: 'sets', | |
type: new GraphQLList() | |
} | |
}, | |
type: GraphQLBoolean, | |
resolve: async function(U, {id}) { | |
return "1"; | |
} | |
}, | |
create: { | |
args: { | |
record: { | |
name: 'record', | |
type: Record | |
} | |
}, | |
type: GraphQLString, | |
resolve: async function(U, {record}) { | |
return "1"; | |
} | |
} | |
} | |
}), | |
*/ | |
subscription: new GraphQLObjectType({ | |
name: 'subscription', | |
fields: { | |
notification: { | |
args: { | |
id: { | |
name: 'id', | |
type: GraphQLString | |
} | |
}, | |
type: Notification, | |
resolve: async function(U, { id }, { requery, client, origin }, ast) { | |
let selections = ast.fieldNodes[0].selectionSet.selections; | |
let map = map_selections(selections); | |
let [permission, details] = await client.request({service: 'permission', payload: { | |
record: { | |
type: 'notification', | |
id: id | |
}, | |
origin: origin, | |
map, | |
rule: { | |
name: 'read' | |
} | |
} | |
}); | |
if (permission) { | |
let {data} = await client.subscribe_once({type: 'user', id: id, on_change: requery}); | |
return data; | |
} else { | |
return {}; | |
} | |
} | |
}, | |
permission: { | |
args: { | |
record: { | |
name: 'record', | |
type: Record | |
} | |
}, | |
type: Permission, | |
resolve: async function (U, { record }, {requery, client, origin}) { | |
let { data } = await client.subscribe_once({type: 'permission', id: record.type, on_change: requery}); | |
return data; | |
} | |
}, | |
part: { | |
args: { | |
id: { | |
name: 'id', | |
type: GraphQLString | |
} | |
}, | |
type: Part, | |
resolve: async function(user, {id}, {requery, client}) { | |
let {data} = await client.subscribe_once({type: 'car.part', id: id, on_change: requery}); | |
return data; | |
} | |
}, | |
user: { | |
args: { | |
id: { | |
name: 'id', | |
type: GraphQLString | |
} | |
}, | |
type: User, | |
resolve: async function (user, {id}, {requery, client, origin}, ast) { | |
let selections = ast.fieldNodes[0].selectionSet.selections | |
let map = map_selections(selections); | |
let [permission, details] = await client.request({service: 'permission', payload: { | |
record: { | |
type: 'user', | |
id: id | |
}, | |
origin: origin, | |
map, | |
rule: { | |
name: 'read' | |
} | |
} | |
}); | |
if (permission) { | |
let {data} = await client.subscribe_once({type: 'user', id: id, on_change: requery}); | |
return data; | |
} else { | |
return {}; | |
} | |
} | |
} | |
} | |
}), | |
query: new GraphQLObjectType({ | |
name: 'query', | |
fields: { | |
user: { | |
args: { | |
id: { | |
name: 'id', | |
type: GraphQLString | |
} | |
}, | |
type: User, | |
resolve: async function (user, {id}, {client}, x1, x2, x3) { | |
let {data} = await client.get({type: 'user', id: id}); | |
return data; | |
} | |
} | |
} | |
}) | |
}) | |
function execute(string, client, origin, callback) { | |
return new Promise((resolve, reject) => { | |
graphql(schema, string, null, {requery: function() {return execute(string, client, origin, callback)}, client, origin}).then((data) => { | |
let result = data.data[Object.keys(data.data)[0]]; | |
callback(result); | |
resolve(result); | |
}) | |
}) | |
} | |
module.exports = { schema, execute }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SYNTACTIC SUGAR ON TOP OF deepstream SERVER API | |
var Client = require('./client.js'); | |
var { execute } = require('./schema.js'); | |
module.exports = class Server { | |
static get Client() { | |
return Client; | |
} | |
get Service() { | |
let server = this; | |
return class { | |
constructor({name, handler}) { | |
this.name = name; | |
this.handler = handler; | |
} | |
start() { | |
server.client.instance.rpc.provide(this.name, this.handler) | |
} | |
stop() { | |
server.client.instance.rpc.unprovide(this.name); | |
} | |
} | |
} | |
static get config() { | |
return { | |
path: './node_modules/deepstream.io/conf/config.yml', | |
helper_path: './node_modules/deepstream.io/conf/helper.config.yml' | |
} | |
} | |
constructor(helper) { | |
this.instance = new Deepstream(helper ? Server.config.helper_path : Server.config.path); | |
} | |
get host() { | |
return this.instance._options.connectionEndpoints[0]._options.host; | |
} | |
get port() { | |
return this.instance._options.connectionEndpoints[0]._options.port; | |
} | |
set(configuration) { | |
for (let key in configuration) { | |
this.instance.set(key, configuration[key]); | |
} | |
} | |
async query(string, origin, callback) { | |
await execute(string, this.client, origin, callback); | |
} | |
connect_to_database() { | |
new Promise((resolve, reject) => { | |
MongoClient.connect("mongodb://localhost:27017", (err, client) => { | |
this.db = client.db('main'); | |
resolve(); | |
}); | |
}); | |
} | |
get services() { | |
return { | |
query: new this.Service({ | |
name: 'query', | |
handler: async ({string, origin}, response) => { | |
var sent = false; | |
this.query(string, origin, ((data) => { | |
if (!sent) { | |
response.send(data); | |
sent = true; | |
} else { | |
this.client.instance.event.emit(origin.id+'/service/query/'+string.hash(), data) | |
} | |
})) | |
} | |
}), | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// DEFINES TYPES FOR schema.js | |
// MOST INTERESTING PART IS THE permission TYPE | |
var { | |
GraphQLObjectType, | |
GraphQLString, | |
GraphQLBoolean, | |
GraphQLUnionType, | |
GraphQLList | |
} = require('graphql'); | |
var json = require('graphql-type-json'); | |
let Reference = new GraphQLObjectType({ | |
name: 'ref', | |
fields: { | |
name: { | |
type: GraphQLString | |
}, | |
value: { | |
type: GraphQLString | |
} | |
} | |
}); | |
let Notification = new GraphQLObjectType({ | |
name: 'notification', | |
fields: { | |
id: { | |
type: GraphQLString | |
}, | |
html: { | |
type: GraphQLString | |
}, | |
/* | |
icon { small, big } //for android and mobile phones in general. | |
picture | |
*/ | |
actions: { | |
type: new GraphQLList(GraphQLString) | |
} | |
} | |
}); | |
let Permission = new GraphQLObjectType({ | |
name: 'permission', | |
fields: () => ({ | |
name: { | |
type: GraphQLString, | |
}, | |
field: { | |
args: { | |
name: { | |
name: 'field_name', | |
type: GraphQLString | |
}, | |
path: { | |
name: 'field_path', | |
type: GraphQLString | |
} | |
}, | |
type: Permission, | |
resolve({fields}, {name, path}) { | |
if (path) { | |
path = path.split('.'); | |
} else if (name) { | |
path = [name]; | |
} else { | |
path = []; | |
} | |
let found = {}; | |
for (let key of path) { | |
found = fields.find((field) => { | |
return field.name === key; | |
}); | |
fields = found.fields; | |
} | |
return found; | |
} | |
}, | |
rule: { | |
args: { | |
name: { | |
name: 'rule_name', | |
type: GraphQLString | |
}, | |
input: { | |
name: 'input', | |
type: json | |
} | |
}, | |
type: GraphQLBoolean, | |
async resolve({rules}, { name, input }, {requery, origin, client}, x, w) { | |
var rule = rules.find((rule) => { | |
return rule.name === name; | |
}); | |
let record = (x.operation.selectionSet.selections[0].arguments[0].value.fields.map(x => { | |
return { | |
name: x.name.value, | |
value: x.value.value | |
} | |
})); | |
let path = (x.operation.selectionSet.selections[0].selectionSet.selections[0].arguments[0].value.value); | |
record = { | |
type: record[0].value, | |
id: record[1].value | |
} | |
let { data } = await client.subscribe_once({type: record.type, id: record.id, path, on_change: requery}); | |
let context = {}; | |
if (rule.refs) { | |
for (let ref of rule.refs) { | |
context[ref.name] = await client.query(eval(ref.query), (x, unsub) => { | |
unsub(); | |
requery(); | |
}) | |
} | |
} | |
with (context) { | |
return eval(eval(rule.verdict)); | |
} | |
} | |
} | |
}) | |
}); | |
module.exports = { | |
Notification: Notification, | |
json: json, | |
Reference: Reference, | |
Permission: Permission, | |
Part: new GraphQLObjectType({ | |
name: 'part', | |
fields: { | |
location: { | |
type: GraphQLString | |
}, | |
listing: { | |
type: new GraphQLObjectType({ | |
name: 'listing', | |
fields: { | |
name: { | |
type: GraphQLString | |
} | |
} | |
}) | |
} | |
} | |
}), | |
User: new GraphQLObjectType({ | |
name: 'user', | |
fields: { | |
ids: { | |
type: new GraphQLObjectType({ | |
name: 'ids', | |
fields: { | |
native: { | |
type: GraphQLString | |
} | |
} | |
}), | |
}, | |
profile: { | |
type: new GraphQLObjectType({ | |
name: 'profile', | |
fields: { | |
contacts: { | |
type: new GraphQLObjectType({ | |
name: 'contacts', | |
fields: { | |
phone: { | |
type: new GraphQLObjectType({ | |
name: 'phone', | |
fields: { | |
number: { | |
type: GraphQLString | |
}, | |
confirmed: { | |
type: GraphQLBoolean | |
} | |
} | |
}) | |
}, | |
email: { | |
type: new GraphQLObjectType({ | |
name: 'email', | |
fields: { | |
address: { | |
type: GraphQLString | |
}, | |
confirmed: { | |
type: GraphQLBoolean | |
} | |
} | |
}) | |
} | |
} | |
}) | |
}, | |
name: { | |
type: new GraphQLObjectType({ | |
name: 'name', | |
fields: { | |
first: { | |
type: GraphQLString | |
}, | |
last: { | |
type: GraphQLString | |
} | |
} | |
}) | |
} | |
} | |
}), | |
}, | |
auth: { | |
type: new GraphQLObjectType({ | |
name: 'auth', | |
fields: { | |
password: { | |
type: GraphQLString | |
} | |
} | |
}) | |
} | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment