Created
August 27, 2020 16:59
-
-
Save GavinRay97/4d3c4540cf45db38b39a6c8e55777cb7 to your computer and use it in GitHub Desktop.
Hasura GraphQL Tx over Websockets client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3.6' | |
services: | |
postgres: | |
image: postgres:12 | |
restart: always | |
volumes: | |
- db_data:/var/lib/postgresql/data | |
environment: | |
POSTGRES_PASSWORD: postgrespassword | |
graphql-engine: | |
image: hasura/graphql-engine:pull3557-2e391cf9 | |
ports: | |
- '8080:8080' | |
depends_on: | |
- 'postgres' | |
restart: always | |
environment: | |
HASURA_GRAPHQL_DATABASE_URL: postgres://postgres:postgrespassword@postgres:5432/postgres | |
## enable the console served by server | |
HASURA_GRAPHQL_ENABLE_CONSOLE: 'true' # set to "false" to disable console | |
## enable debugging mode. It is recommended to disable this in production | |
HASURA_GRAPHQL_DEV_MODE: 'true' | |
HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup, http-log, webhook-log, websocket-log, query-log | |
## uncomment next line to set an admin secret | |
# HASURA_GRAPHQL_ADMIN_SECRET: myadminsecretkey | |
volumes: | |
db_data: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import WebSocket from 'ws' | |
import { | |
InitTxPayload, | |
TxMessageType, | |
ExecuteTxPayload, | |
CommitTxPayload, | |
AbortTxPayload, | |
INIT_TX_SUCCESS_RESPONSE, | |
INIT_TX_RESPONSE, | |
GraphQLOperation, | |
EXECUTE_TX_SUCCESS_RESPONSE, | |
EXECUTE_TX_RESPONSE, | |
COMMIT_OR_ABORT_TX_RESPONSE, | |
} from './types' | |
const getUserQuery = ` | |
query q { | |
users_by_pk(id: 1) { | |
id | |
name | |
} | |
} | |
` | |
const changeUserNameQuery = (id, newName) => ` | |
mutation MyMutation { | |
__typename | |
update_users(where: {id: {_eq: ${id}}}, _set: {name: "${newName}"}) { | |
affected_rows | |
returning { | |
name | |
id | |
} | |
} | |
}` | |
const changeUserNumericMutation = (id, val) => ` | |
mutation MyMutation { | |
__typename | |
update_users(where: {id: {_eq: ${id}} }, _set: { numeric_column: ${val} }) { | |
affected_rows | |
returning { | |
name | |
id | |
} | |
} | |
}` | |
function sleep(ms) { | |
return new Promise((resolve) => setTimeout(resolve, ms)) | |
} | |
class TxClient { | |
socket: WebSocket | |
ready = false | |
lastQueryTimestamp: number | |
constructor(public url: string, public headers?: Record<string, any>) { | |
this.socket = TxClient.makeSocket(url, headers) | |
this.socket.on('open', () => (this.ready = true)) | |
// When a transaction is finished, it will have the socket closed | |
// So listen for this, and replace it. This needs to be slightly throttled | |
// Which we do by checking timeSinceLastQuery and enforcing a 100ms delay | |
// Without this, two back-to-back client.transaction() calls will fail | |
this.socket.on('close', () => { | |
this.socket = TxClient.makeSocket(url, headers) | |
}) | |
} | |
static makeSocket(url, headers) { | |
return new WebSocket(url, 'graphql-tx', { headers }) | |
} | |
timeSinceLastQuery() { | |
return Date.now() - this.lastQueryTimestamp | |
} | |
get txns() { | |
return { | |
init: (payload?: InitTxPayload['payload']) => { | |
const msg: InitTxPayload = { | |
type: TxMessageType.INIT, | |
payload: payload || {}, | |
} | |
return this.socket.send(JSON.stringify(msg)) | |
}, | |
execute: (operation: ExecuteTxPayload['payload']['query']) => { | |
const msg: ExecuteTxPayload = { | |
type: TxMessageType.EXECUTE, | |
payload: { query: operation }, | |
} | |
return this.socket.send(JSON.stringify(msg)) | |
}, | |
commit: () => { | |
const msg: CommitTxPayload = { | |
type: TxMessageType.COMMIT, | |
} | |
return this.socket.send(JSON.stringify(msg)) | |
}, | |
abort: () => { | |
const msg: AbortTxPayload = { | |
type: TxMessageType.ABORT, | |
} | |
return this.socket.send(JSON.stringify(msg)) | |
}, | |
} | |
} | |
async init( | |
payload?: InitTxPayload['payload'] | |
): Promise<INIT_TX_SUCCESS_RESPONSE> { | |
while (!this.ready) await sleep(100) | |
return new Promise((resolve, reject) => { | |
this.txns.init(payload) | |
this.socket.once('message', (msg: WebSocket.Data) => { | |
const response: INIT_TX_RESPONSE = JSON.parse(String(msg)) | |
switch (response.type) { | |
case 'initialised': | |
return resolve(response) | |
case 'init_error': | |
return reject(response) | |
} | |
}) | |
this.socket.on('error', reject) | |
}) | |
} | |
async execute(query: GraphQLOperation): Promise<EXECUTE_TX_SUCCESS_RESPONSE> { | |
return new Promise((resolve, reject) => { | |
this.txns.execute(query) | |
this.socket.once('message', (msg: WebSocket.Data) => { | |
const response: EXECUTE_TX_RESPONSE = JSON.parse(String(msg)) | |
switch (response.type) { | |
case 'data': | |
return resolve(response) | |
case 'error': | |
return reject(response) | |
} | |
}) | |
this.socket.once('error', reject) | |
}) | |
} | |
async commit(): Promise<COMMIT_OR_ABORT_TX_RESPONSE> { | |
// Throttle | |
if (this.timeSinceLastQuery() < 100) await sleep(100) | |
// Set the last query time to now, used for throttling to ensure functionality | |
this.lastQueryTimestamp = Date.now() | |
return new Promise((resolve, reject) => { | |
this.txns.commit() | |
this.socket.once('close', (code, reason) => { | |
switch (reason) { | |
case "Closing connection after 'commit' and 'abort'": | |
return resolve({ code, reason } as COMMIT_OR_ABORT_TX_RESPONSE) | |
default: | |
return reject({ code, reason }) | |
} | |
}) | |
this.socket.once('error', reject) | |
}) | |
} | |
async abort(): Promise<COMMIT_OR_ABORT_TX_RESPONSE> { | |
// Set the last query time to now, used for throttling to ensure functionality | |
this.lastQueryTimestamp = Date.now() | |
return new Promise((resolve, reject) => { | |
this.txns.abort() | |
this.socket.once('close', (code, reason) => { | |
switch (reason) { | |
case "Closing connection after 'commit' and 'abort'": | |
return resolve({ code, reason } as COMMIT_OR_ABORT_TX_RESPONSE) | |
default: | |
return reject({ code, reason }) | |
} | |
}) | |
this.socket.once('error', reject) | |
}) | |
} | |
} | |
const tx = new TxClient('ws://localhost:8080/v1/graphql') | |
async function main() { | |
try { | |
await tx.init() | |
const op1 = await tx.execute({ query: getUserQuery }) | |
console.log('op 1', op1) | |
const op2 = await tx.execute({ | |
query: changeUserNameQuery(1, 'new name'), | |
}) | |
console.log('op 2', op2) | |
const op3 = await tx.execute({ query: changeUserNumericMutation(2, 'asd') }) | |
console.log('op 3', op3) | |
} catch (err) { | |
await tx.abort() | |
console.log('GOT ERROR IN MAIN:', err) | |
} finally { | |
await tx.commit() | |
} | |
} | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export enum TxMessageType { | |
INIT = 'init', | |
EXECUTE = 'execute', | |
COMMIT = 'commit', | |
ABORT = 'abort', | |
} | |
export interface GQLWebsocketTxPayload { | |
type: TxMessageType | |
} | |
export interface InitTxPayload extends GQLWebsocketTxPayload { | |
type: TxMessageType.INIT | |
payload: { | |
/** @default 'read-committed' */ | |
isolation?: 'read-committed' | 'serializable' | 'repeatable-read' | |
headers?: Record<string, string> | |
} | |
} | |
export type INIT_TX_SUCCESS_RESPONSE = { | |
type: 'initialised' | |
} | |
export type INIT_TX_ERROR_RESPONSE = { | |
type: 'init_error' | |
payload: 'transaction cannot be initialised more than once in a single WebSocket session' | |
} | |
export type INIT_TX_RESPONSE = INIT_TX_SUCCESS_RESPONSE | INIT_TX_ERROR_RESPONSE | |
export interface GraphQLOperation { | |
operationName?: string | |
query: string | |
variables?: Record<string, any> | |
} | |
export interface ExecuteTxPayload extends GQLWebsocketTxPayload { | |
type: TxMessageType.EXECUTE | |
payload: { | |
request_id?: string | |
query: GraphQLOperation | |
} | |
} | |
export type EXECUTE_TX_SUCCESS_RESPONSE = { | |
type: 'data' | |
id: string | |
request_id: string | |
payload: { | |
data: { | |
[key: string]: object | object[] | |
} | |
} | |
} | |
export type EXECUTE_TX_ERROR_RESPONSE = { | |
type: 'error' | |
id: string | |
request_id: string | |
errors: Array<{ | |
message: string | |
extension: { | |
code: string | |
path: string | |
} | |
}> | |
} | |
export type EXECUTE_TX_RESPONSE = | |
| EXECUTE_TX_SUCCESS_RESPONSE | |
| EXECUTE_TX_ERROR_RESPONSE | |
export interface CommitTxPayload extends GQLWebsocketTxPayload { | |
type: TxMessageType.COMMIT | |
} | |
export interface AbortTxPayload extends GQLWebsocketTxPayload { | |
type: TxMessageType.ABORT | |
} | |
export type COMMIT_OR_ABORT_TX_RESPONSE = { | |
code: 1000 | |
reason: "Closing connection after 'commit' and 'abort'" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment