Skip to content

Instantly share code, notes, and snippets.

@RashadSaleh
Created March 10, 2018 13:12
Show Gist options
  • Save RashadSaleh/c7405f530f90bd9a96b96c7cfeeabd2a to your computer and use it in GitHub Desktop.
Save RashadSaleh/c7405f530f90bd9a96b96c7cfeeabd2a to your computer and use it in GitHub Desktop.
deepstream with GraphQL subscriptions and permissioning
// SYNTACTIC SUGAR ON TOP OF deepstream CLIENT API
var deepstream = require('deepstream.io-client-js');
String.prototype.hash = function() {
var hash = 0, i, chr;
if (this.length === 0) return hash;
for (i = 0; i < this.length; i++) {
chr = this.charCodeAt(i);
hash = ((hash << 5) - hash) + chr;
hash |= 0; // Convert to 32bit integer
}
return hash;
};
var _ = require('lodash');
module.exports = class Client {
constructor({host = 'localhost', port = 6020}) {
this.instance = deepstream(host+':'+port);
}
authenticate({username = null, password = null}) {
return new Promise((resolve, reject) => {
this.instance.login({username, password}, (success, data) => {
if (success) {
this.id = username;
resolve();
}
else reject();
})
});
}
subscribe_once({type, id, path, on_change}) {
return new Promise((resolve, reject) => {
this.instance.record.getRecord(type+'/'+id).whenReady((r)=> {
r.subscribe(path, async () => {
await on_change();
r.discard();
}, false);
resolve({
unsubscribe: ()=>{
r.discard();
},
data: r.get(path)
});
});
});
}
subscribe({type, id, path, on_change}) {
return new Promise((resolve, reject) => {
this.instance.record.getRecord(type+'/'+id).whenReady((r)=> {
r.subscribe(path, async()=> {await on_change();}, false);
resolve({
unsubscribe: ()=>{
r.discard();
},
data: r.get(path)
});
});
});
}
exists({type, id}) {
return new Promise((resolve) => {
this.instance.record.has(type+'/'+id, (error, result) => {
resolve(result);
})
})
}
request({service, payload = {}}) {
return new Promise((resolve, reject) => {
if (!payload.origin) delete payload.origin; // (so that "null" and "undefined" values of origin do not override the default value).
this.instance.rpc.make(service, { origin: {id: this.id}, ...payload}, (error, response) => {
if (error) {
reject(error);
} else {
resolve(response);
}
})
})
}
delete({type, id}) {
return this.request({service: 'record/delete', payload: {record: {type, id}, origin}});
}
set({type, id}, {path, value}, origin) {
return this.request({service: 'record/set', payload: {record: {type, id}, path, value, origin}});
}
create({type, id}, origin) {
return this.request({service: 'record/create', payload: {record: {type, id}, origin}});
}
get_record({type, id}) {
return new Promise((resolve, reject) => {
this.instance.record.getRecord(type+'/'+id).whenReady((r) => {
resolve(r);
});
});
}
get({type, id, path}) {
return new Promise((resolve, reject)=>{
this.instance.record.getRecord(type+'/'+id).whenReady((r)=> {
resolve(r.get(path));
});
});
}
generate_new_unique_id() {
return this.instance.getUid();
}
get Signal() {
let client = this;
return class {
constructor({name, to, data}) {
this.name = name;
this.to = to;
this.data = data;
client.instance.event.emit(this.to + '/' + this.name, {data: this.data, origin: client.id});
}
}
}
get when() {
let registrar = (signal) => {
return (handler) => {
this.instance.event.subscribe('/'+ signal, handler);
return () => {
this.instance.event.unsubscribe('/'+ signal, handler);
}
}
}
registrar.private = (signal) => {
return (handler) => {
this.instance.event.subscribe(this.id + '/' + signal, handler);
return () => {
this.instance.event.unsubscribe(this.id + '/' + signal, handler);
}
}
}
return registrar;
}
async query(string, on_change) {
// THIS IS A VERY WEIRD WAY OF SUBSCRIBING TO QUERY RESULT UPDATES.
// BUT IT IS NOT NECESSARY;
// REPLACE subscriber WITH ANYTHING YOU LIKE INSTEAD.
let x = {};
let data = await this.request({service: 'query', payload: {string}});
for (key in x) delete x[key];
_.assign(x, data);
let subscriber = (data) => {
for (let key in x) delete x[key];
_.assign(x, data);
if (on_change) {
console.log('change happened');
on_change(x, () => {
this.instance.event.unsubscribe(this.id+'/service/query/'+string.hash(), subscriber);
});
}
}
this.instance.event.subscribe(this.id+'/service/query/'+string.hash(), subscriber);
return x;
}
}
// GRAPHQL SCHEMA. DEPENDS ON types.js
// MOST INTERESTING PART IS AT THE BOTTOM, WHERE requery IS DEFINED AND USED TO KEEP THINGS UP TO DATE.
var {
GraphQLSchema,
GraphQLObjectType,
GraphQLString,
GraphQLBoolean,
graphql,
GraphQLInputObjectType
} = require('graphql');
var { User, json, Part, Permission, Rule, Reference, Notification } = require('./types.js');
let Record = new GraphQLInputObjectType ({
name: 'record',
fields: {
type: {
name: 'type',
type: GraphQLString
},
id: {
name: 'id',
type: GraphQLString
}
}
});
function map_selections(selections) {
let map = {};
for (let selection of selections) {
if (selection.selectionSet) {
map[selection.name.value] = map_selections(selection.selectionSet.selections);
} else {
map[selection.name.value] = true;
}
}
return map;
}
var schema = new GraphQLSchema({
/*
mutation: new GraphQLObjectType({
name: 'mutation',
fields: {
user: {
args: {
id: {
name: 'id',
type: GraphQLString
},
sets: {
name: 'sets',
type: new GraphQLList()
}
},
type: GraphQLBoolean,
resolve: async function(U, {id}) {
return "1";
}
},
create: {
args: {
record: {
name: 'record',
type: Record
}
},
type: GraphQLString,
resolve: async function(U, {record}) {
return "1";
}
}
}
}),
*/
subscription: new GraphQLObjectType({
name: 'subscription',
fields: {
notification: {
args: {
id: {
name: 'id',
type: GraphQLString
}
},
type: Notification,
resolve: async function(U, { id }, { requery, client, origin }, ast) {
let selections = ast.fieldNodes[0].selectionSet.selections;
let map = map_selections(selections);
let [permission, details] = await client.request({service: 'permission', payload: {
record: {
type: 'notification',
id: id
},
origin: origin,
map,
rule: {
name: 'read'
}
}
});
if (permission) {
let {data} = await client.subscribe_once({type: 'user', id: id, on_change: requery});
return data;
} else {
return {};
}
}
},
permission: {
args: {
record: {
name: 'record',
type: Record
}
},
type: Permission,
resolve: async function (U, { record }, {requery, client, origin}) {
let { data } = await client.subscribe_once({type: 'permission', id: record.type, on_change: requery});
return data;
}
},
part: {
args: {
id: {
name: 'id',
type: GraphQLString
}
},
type: Part,
resolve: async function(user, {id}, {requery, client}) {
let {data} = await client.subscribe_once({type: 'car.part', id: id, on_change: requery});
return data;
}
},
user: {
args: {
id: {
name: 'id',
type: GraphQLString
}
},
type: User,
resolve: async function (user, {id}, {requery, client, origin}, ast) {
let selections = ast.fieldNodes[0].selectionSet.selections
let map = map_selections(selections);
let [permission, details] = await client.request({service: 'permission', payload: {
record: {
type: 'user',
id: id
},
origin: origin,
map,
rule: {
name: 'read'
}
}
});
if (permission) {
let {data} = await client.subscribe_once({type: 'user', id: id, on_change: requery});
return data;
} else {
return {};
}
}
}
}
}),
query: new GraphQLObjectType({
name: 'query',
fields: {
user: {
args: {
id: {
name: 'id',
type: GraphQLString
}
},
type: User,
resolve: async function (user, {id}, {client}, x1, x2, x3) {
let {data} = await client.get({type: 'user', id: id});
return data;
}
}
}
})
})
function execute(string, client, origin, callback) {
return new Promise((resolve, reject) => {
graphql(schema, string, null, {requery: function() {return execute(string, client, origin, callback)}, client, origin}).then((data) => {
let result = data.data[Object.keys(data.data)[0]];
callback(result);
resolve(result);
})
})
}
module.exports = { schema, execute };
// SYNTACTIC SUGAR ON TOP OF deepstream SERVER API
var Client = require('./client.js');
var { execute } = require('./schema.js');
module.exports = class Server {
static get Client() {
return Client;
}
get Service() {
let server = this;
return class {
constructor({name, handler}) {
this.name = name;
this.handler = handler;
}
start() {
server.client.instance.rpc.provide(this.name, this.handler)
}
stop() {
server.client.instance.rpc.unprovide(this.name);
}
}
}
static get config() {
return {
path: './node_modules/deepstream.io/conf/config.yml',
helper_path: './node_modules/deepstream.io/conf/helper.config.yml'
}
}
constructor(helper) {
this.instance = new Deepstream(helper ? Server.config.helper_path : Server.config.path);
}
get host() {
return this.instance._options.connectionEndpoints[0]._options.host;
}
get port() {
return this.instance._options.connectionEndpoints[0]._options.port;
}
set(configuration) {
for (let key in configuration) {
this.instance.set(key, configuration[key]);
}
}
async query(string, origin, callback) {
await execute(string, this.client, origin, callback);
}
connect_to_database() {
new Promise((resolve, reject) => {
MongoClient.connect("mongodb://localhost:27017", (err, client) => {
this.db = client.db('main');
resolve();
});
});
}
get services() {
return {
query: new this.Service({
name: 'query',
handler: async ({string, origin}, response) => {
var sent = false;
this.query(string, origin, ((data) => {
if (!sent) {
response.send(data);
sent = true;
} else {
this.client.instance.event.emit(origin.id+'/service/query/'+string.hash(), data)
}
}))
}
}),
}
}
}
// DEFINES TYPES FOR schema.js
// MOST INTERESTING PART IS THE permission TYPE
var {
GraphQLObjectType,
GraphQLString,
GraphQLBoolean,
GraphQLUnionType,
GraphQLList
} = require('graphql');
var json = require('graphql-type-json');
let Reference = new GraphQLObjectType({
name: 'ref',
fields: {
name: {
type: GraphQLString
},
value: {
type: GraphQLString
}
}
});
let Notification = new GraphQLObjectType({
name: 'notification',
fields: {
id: {
type: GraphQLString
},
html: {
type: GraphQLString
},
/*
icon { small, big } //for android and mobile phones in general.
picture
*/
actions: {
type: new GraphQLList(GraphQLString)
}
}
});
let Permission = new GraphQLObjectType({
name: 'permission',
fields: () => ({
name: {
type: GraphQLString,
},
field: {
args: {
name: {
name: 'field_name',
type: GraphQLString
},
path: {
name: 'field_path',
type: GraphQLString
}
},
type: Permission,
resolve({fields}, {name, path}) {
if (path) {
path = path.split('.');
} else if (name) {
path = [name];
} else {
path = [];
}
let found = {};
for (let key of path) {
found = fields.find((field) => {
return field.name === key;
});
fields = found.fields;
}
return found;
}
},
rule: {
args: {
name: {
name: 'rule_name',
type: GraphQLString
},
input: {
name: 'input',
type: json
}
},
type: GraphQLBoolean,
async resolve({rules}, { name, input }, {requery, origin, client}, x, w) {
var rule = rules.find((rule) => {
return rule.name === name;
});
let record = (x.operation.selectionSet.selections[0].arguments[0].value.fields.map(x => {
return {
name: x.name.value,
value: x.value.value
}
}));
let path = (x.operation.selectionSet.selections[0].selectionSet.selections[0].arguments[0].value.value);
record = {
type: record[0].value,
id: record[1].value
}
let { data } = await client.subscribe_once({type: record.type, id: record.id, path, on_change: requery});
let context = {};
if (rule.refs) {
for (let ref of rule.refs) {
context[ref.name] = await client.query(eval(ref.query), (x, unsub) => {
unsub();
requery();
})
}
}
with (context) {
return eval(eval(rule.verdict));
}
}
}
})
});
module.exports = {
Notification: Notification,
json: json,
Reference: Reference,
Permission: Permission,
Part: new GraphQLObjectType({
name: 'part',
fields: {
location: {
type: GraphQLString
},
listing: {
type: new GraphQLObjectType({
name: 'listing',
fields: {
name: {
type: GraphQLString
}
}
})
}
}
}),
User: new GraphQLObjectType({
name: 'user',
fields: {
ids: {
type: new GraphQLObjectType({
name: 'ids',
fields: {
native: {
type: GraphQLString
}
}
}),
},
profile: {
type: new GraphQLObjectType({
name: 'profile',
fields: {
contacts: {
type: new GraphQLObjectType({
name: 'contacts',
fields: {
phone: {
type: new GraphQLObjectType({
name: 'phone',
fields: {
number: {
type: GraphQLString
},
confirmed: {
type: GraphQLBoolean
}
}
})
},
email: {
type: new GraphQLObjectType({
name: 'email',
fields: {
address: {
type: GraphQLString
},
confirmed: {
type: GraphQLBoolean
}
}
})
}
}
})
},
name: {
type: new GraphQLObjectType({
name: 'name',
fields: {
first: {
type: GraphQLString
},
last: {
type: GraphQLString
}
}
})
}
}
}),
},
auth: {
type: new GraphQLObjectType({
name: 'auth',
fields: {
password: {
type: GraphQLString
}
}
})
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment