Created
August 20, 2021 13:58
-
-
Save rtang03/7c5015c4d588e6bb0425e29c9289a361 to your computer and use it in GitHub Desktop.
troubleshooting issue223
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import util from 'util'; | |
import Debug from 'debug'; | |
import filter from 'lodash/filter'; | |
import groupBy from 'lodash/groupBy'; | |
import isEqual from 'lodash/isEqual'; | |
import { FTSearchParameters, Redisearch } from 'redis-modules-sdk'; | |
import { Commit, computeEntity, HandlerResponse } from '../types'; | |
import { getLogger, isCommit } from '../utils'; | |
import { | |
INVALID_ARG, | |
NO_RECORDS, | |
QUERY_ERR, | |
REDIS_ERR, | |
REDUCE_ERR, | |
REPO_NOT_FOUND, | |
} from './constants'; | |
import { commitSearchDefinition, postSelector, preSelector } from './model'; | |
import type { CommitInRedis, OutputCommit, QueryDatabase, RedisRepository } from './types'; | |
import { createNotificationCenter, createRedisRepository } from '.'; | |
/** | |
* @about create query database | |
* @example [subscribe.unit-test.ts](https://github.com/rtang03/fabric-es/blob/master/packages/fabric-cqrs/src/queryHandler/__tests__/subscribe.unit-test.ts) | |
* @params client redisearch client | |
* @params repos RedisRepositories | |
* @returns [[QueryDatabase]] | |
*/ | |
export const createQueryDatabase: ( | |
client: Redisearch, | |
repos: Record<string, RedisRepository<any>>, | |
option?: { notifyExpiryBySec?: number } | |
) => QueryDatabase = (client, repos, { notifyExpiryBySec } = { notifyExpiryBySec: 86400 }) => { | |
const debug = Debug('queryHandler:createQueryBase'); | |
const logger = getLogger({ name: '[query-handler] createQueryDatabase.js', target: 'console' }); | |
const commitRepo = createRedisRepository<Commit, CommitInRedis, OutputCommit>('commit', { | |
client, | |
kind: 'commit', | |
fields: commitSearchDefinition, | |
preSelector, | |
postSelector, | |
}); | |
const notificationCenter = createNotificationCenter(client); | |
// add built-in commit repo | |
const allRepos = Object.assign(repos, { commit: commitRepo }); | |
const getHistory = (commits: (Commit | OutputCommit)[]): any[] => { | |
const history = []; | |
commits.forEach(({ events }) => events.forEach((event) => history.push(event))); | |
return history; | |
}; | |
const deleteItems = async <TItem>(repo: RedisRepository<TItem>, pattern: string) => { | |
const [errors, count] = await repo.deleteItemsByPattern(pattern); | |
const isError = errors?.reduce((pre, cur) => pre || !!cur, false); | |
return isError | |
? { | |
status: 'ERROR' as any, | |
message: `${count} record(s) deleted`, | |
errors, | |
} | |
: { | |
status: 'OK' as any, | |
message: `${count} record(s) deleted`, | |
data: count, | |
}; | |
}; | |
const queryCommit = async (pattern, args) => { | |
const [errors, data] = await commitRepo.queryCommitsByPattern( | |
commitRepo.getPattern(pattern, args) | |
); | |
const isError = errors?.reduce((pre, cur) => pre || !!cur, false); | |
return isError | |
? { status: 'ERROR' as any, message: QUERY_ERR, errors } | |
: { status: 'OK' as any, message: `${data.length} record(s) returned`, data }; | |
}; | |
const doSearch: <T>(option: { | |
repo: RedisRepository<T>; | |
kind: 'commit' | 'entity'; | |
query: string; | |
param: FTSearchParameters; | |
countTotalOnly: boolean; | |
}) => Promise<HandlerResponse> = async ({ repo, kind, query, param, countTotalOnly }) => { | |
const { search, getIndexName } = repo; | |
const index = getIndexName(); | |
const [errors, count, data] = await search({ | |
countTotalOnly, | |
kind, | |
index, | |
query, | |
param, | |
restoreFn: kind === 'entity' && repo.getPostSelector(), | |
}); | |
const isError = errors?.reduce((pre, cur) => pre || !!cur, false); | |
return isError | |
? { status: 'ERROR', message: 'search error', errors } | |
: { | |
status: 'OK', | |
message: `${count} record(s) returned`, | |
data: countTotalOnly ? count : data, | |
}; | |
}; | |
return { | |
clearNotification: async (option) => notificationCenter.clearNotification(option), | |
clearNotifications: async (option) => notificationCenter.clearNotifications(option), | |
deleteCommitByEntityId: async ({ entityName, id }) => { | |
if (!entityName || !id) throw new Error(INVALID_ARG); | |
return deleteItems( | |
commitRepo, | |
commitRepo.getPattern('COMMITS_BY_ENTITYNAME_ENTITYID', [entityName, id]) | |
); | |
}, | |
deleteCommitByEntityName: async ({ entityName }) => { | |
if (!entityName) throw new Error(INVALID_ARG); | |
return deleteItems(commitRepo, commitRepo.getPattern('COMMITS_BY_ENTITYNAME', [entityName])); | |
}, | |
deleteEntityByEntityName: async <TEntity>({ entityName }) => { | |
if (!entityName) throw new Error(INVALID_ARG); | |
const entityRepo = allRepos[entityName]; | |
if (!entityRepo) throw new Error(`deleteEntityByEntityName: ${entityName} ${REPO_NOT_FOUND}`); | |
return deleteItems<TEntity>( | |
entityRepo, | |
entityRepo.getPattern('ENTITIES_BY_ENTITYNAME', [entityName]) | |
); | |
}, | |
fullTextSearchCommit: async ({ query, param, countTotalOnly }) => { | |
if (!query) throw new Error(INVALID_ARG); | |
return doSearch<OutputCommit>({ | |
repo: commitRepo, | |
countTotalOnly, | |
kind: 'commit', | |
query, | |
param, | |
}); | |
}, | |
fullTextSearchEntity: async <TEntity>({ entityName, query, param, countTotalOnly }) => { | |
if (!query || !entityName) throw new Error(INVALID_ARG); | |
const repo = allRepos[entityName]; | |
if (!repo) | |
throw new Error( | |
`fullTextSearchEntity: ${entityName} ${REPO_NOT_FOUND} -- ${Object.keys(allRepos)}` | |
); | |
return doSearch<TEntity>({ repo, countTotalOnly, kind: 'entity', query, param }); | |
}, | |
getNotification: async ({ creator, entityName, id, commitId }) => | |
notificationCenter.getNotification({ creator, entityName, id, commitId }), | |
getNotificationsByFields: async ({ creator, entityName, id }) => | |
notificationCenter.getNotificationsByFields({ creator, entityName, id }), | |
getRedisCommitRepo: () => commitRepo, | |
queryCommitByEntityId: async ({ entityName, id }) => { | |
if (!entityName || !id) throw new Error(INVALID_ARG); | |
return queryCommit('COMMITS_BY_ENTITYNAME_ENTITYID', [entityName, id]); | |
}, | |
queryCommitByEntityName: async ({ entityName }) => { | |
if (!entityName) throw new Error(INVALID_ARG); | |
return queryCommit('COMMITS_BY_ENTITYNAME', [entityName]); | |
}, | |
mergeCommit: async ({ commit }) => { | |
if (!isCommit(commit)) throw new Error(INVALID_ARG); | |
try { | |
const key = allRepos['commit'].getKey(commit); | |
const status = await allRepos['commit'].hmset(commit); | |
return { | |
status, | |
message: `${key} merged successfully`, | |
data: [key], | |
}; | |
} catch (e) { | |
logger.error(util.format('mergeCommit - %s, %j', REDIS_ERR, e)); | |
throw e; | |
} | |
}, | |
mergeCommitBatch: async ({ entityName, commits }) => { | |
if (!entityName || !commits) throw new Error(INVALID_ARG); | |
if (isEqual(commits, {})) | |
return { | |
status: 'OK', | |
message: NO_RECORDS, | |
data: [], | |
}; | |
const data = []; | |
const error = []; | |
try { | |
for await (const commit of Object.values(commits)) { | |
const status = await allRepos['commit'].hmset(commit); | |
const key = allRepos['commit'].getKey(commit); | |
if (status === 'OK') data.push(key); | |
else error.push(key); | |
} | |
} catch (e) { | |
logger.error(util.format('mergeCommitBatch - %s, %j', REDIS_ERR, e)); | |
throw e; | |
} | |
return { | |
status: error.length === 0 ? 'OK' : 'ERROR', | |
message: `${data.length} record(s) merged successfully`, | |
data, | |
error, | |
errors: error, | |
}; | |
}, | |
mergeEntity: async ({ commit, reducer }) => { | |
const { entityName, entityId, commitId } = commit; | |
const entityRepo = allRepos[entityName]; | |
const entityKeyInRedis = allRepos[entityName].getKey(commit); | |
const commitKeyInRedis = allRepos['commit'].getKey(commit); | |
// DEBUG ISSUE 223 | |
debug('commit: %O', commit); | |
debug('entityKeyInRedis: %s', entityKeyInRedis); | |
debug('commitKeyInRedis: %s', commitKeyInRedis); | |
if (!entityRepo) throw new Error(`mergeEntity: ${entityName} ${REPO_NOT_FOUND}`); | |
if (!isCommit(commit) || !reducer) throw new Error(INVALID_ARG); | |
// step 1: retrieve existing commit | |
const pattern = commitRepo.getPattern('COMMITS_BY_ENTITYNAME_ENTITYID', [ | |
entityName, | |
entityId, | |
]); | |
// DEBUG ISSUE 223 | |
debug('pattern: %s', pattern); | |
const [errors, restoredCommits] = await commitRepo.queryCommitsByPattern(pattern); | |
const isError = errors?.reduce((pre, cur) => pre || !!cur, false); | |
// DEBUG ISSUE 223 | |
debug('restoredCommits: %s', restoredCommits); | |
if (isError) | |
return { | |
status: 'ERROR', | |
message: 'fail to retrieve existing commit', | |
errors, | |
}; | |
// step 2: merge existing record with newly retrieved commit | |
const history: (Commit | OutputCommit)[] = [...restoredCommits, commit]; | |
// step 3: compute the timeline of event history, and add tracking info | |
/* e.g. newly computed state = | |
{ | |
id: 'qh_proj_test_001', | |
desc: 'query handler #2 proj', | |
tag: 'projection', | |
value: 2, | |
_ts: 1590739000, | |
_created: 1590738792, | |
_creator: 'org1-admin' | |
} | |
*/ | |
const { state, reduced } = computeEntity(history, reducer); | |
debug('entity being merged, %O', state); | |
// step 5: compute events history, returning comma separator | |
if (reduced && !state?.id) { | |
return { | |
status: 'ERROR', | |
message: REDUCE_ERR, | |
errors: [new Error(`fail to reduce, ${entityName}:${entityId}:${commitId}`)], | |
}; | |
} | |
const data = []; | |
try { | |
let status; | |
// step 6: add entity | |
if (reduced) { | |
status = await allRepos[entityName].hmset(state, history); | |
data.push({ key: entityKeyInRedis, status }); | |
} | |
// DEBUG ISSUE 223 | |
debug('add-entity status: %O', status); | |
// step 7: add commit | |
status = await allRepos['commit'].hmset(commit); | |
data.push({ key: commitKeyInRedis, status }); | |
// DEBUG ISSUE 223 | |
debug('add-commit status: %O', status); | |
// step 8: add notification flag | |
if (reduced) { | |
await notificationCenter.notify({ | |
creator: state._creator, | |
entityName, | |
id: entityId, | |
commitId, | |
}); | |
} | |
} catch (e) { | |
// DEBUG ISSUE 223 | |
debug('try-catch error: %O', e); | |
if (!e.message.startsWith('[lifecycle]')) | |
logger.error(util.format('mergeEntity - %s, %j', REDIS_ERR, e)); | |
throw e; | |
} | |
debug('data returns: %O', data); | |
return { status: 'OK', message: `${entityKeyInRedis} merged successfully`, data }; | |
}, | |
mergeEntityBatch: async ({ entityName, commits, reducer }) => { | |
if (!entityName || !commits || !reducer) throw new Error(INVALID_ARG); | |
const entityRepo = allRepos[entityName]; | |
if (!entityRepo) throw new Error(`mergeEntityBatch: ${entityName} ${REPO_NOT_FOUND}`); | |
if (isEqual(commits, {})) | |
return { | |
status: 'OK', | |
message: NO_RECORDS, | |
data: [], | |
}; | |
// safety filter: ensure only relevant entityName is processed | |
const filteredCommits = filter(commits, { entityName }); | |
const groupByEntityId: Record<string, Commit[]> = groupBy(filteredCommits, ({ id }) => id); | |
const errors = []; | |
const entities = Object.entries(groupByEntityId) | |
.map(([entityId, commits]) => { | |
const { state, reduced } = computeEntity(commits, reducer); | |
const keyOfEntityInRedis = allRepos[entityName].getKey(commits[0]); | |
// if reducer fails | |
if (reduced && !state) errors.push(entityId); | |
return { state, commits, key: keyOfEntityInRedis }; | |
}) | |
.filter(({ state }) => !!state); // ensure no null; if error happens when reducing | |
debug('errors found, %O', errors); | |
// add entity. Notice that the original orginal commit is not saved. | |
const data = []; | |
for await (const { state, commits, key } of entities) { | |
try { | |
const status = await allRepos[entityName].hmset(state, commits); | |
data.push({ key, status }); | |
} catch (e) { | |
logger.error(util.format('mergeEntityBatch - %s, %j', REDIS_ERR, e)); | |
throw e; | |
} | |
} | |
debug('data returns: %O', data); | |
return { | |
status: errors.length === 0 ? 'OK' : 'ERROR', | |
message: `${data.length} record(s) merged`, | |
data, | |
errors: errors.length === 0 ? null : errors, | |
}; | |
}, | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment