-
upsert / merge (data) is called
- validate data with schema
- create transaction
- set new value
- add key to 'sync-queue' list :
[{key, value}]
- publish 'trigger-sync' event
- execute transaction
-
worker event listener receives an 'entity-update' event
- if current < limit, spawn a 'sync-worker' instance
- worker executes lua script (STEP A)
- temp = sync-queue.lpop
- sync-processing.rpush (temp)
- return data
- worker updates elasticsearch
- IF elasticsearc update fails
- execute script to remove item from sync-processing, then push it back to sync-queue
- worker executes script
- sync-queue.lrem item
- worker publishes event 'sync-complete' for affected keys / transaction
- worker attempts goes back to STEP A
- if no data was fetched in STEP A, we kill this worker instance, reduce current
-
lua script examples https://medium.com/@stockholmux/save-node-js-headaches-with-lua-and-redis-165df4a620b5
-
we should replace transactions with lua scripts for performance, I assume?
- reduces function calls
- works with javascript string literals
${varGoesHere}
Created
November 9, 2018 05:10
-
-
Save davalapar/20582851f5c561e835bfd68b888eace4 to your computer and use it in GitHub Desktop.
redis-elasticsearch sync
const key = 'QWERTY';
const value = 123;
redis.rpush("sync-queue", key);
redis.hset("sync-data", key, value);
const scriptMoveFromQueueToProcessing = `
fetched = redis.lpop("sync-queue")
if fetched == nil then
return nil
end
redis.sadd("sync-processing", fetched)
redis.publish("sync-events-processing", fetched)
return redis.hget(fetched);
`;
const scriptMoveFromProcessingToQueue = `
redis.srem("sync-processing", ${val})
redis.rpush("sync-queue", ${val})
redis.publish("sync-events-queue", ${val})
`;
use UUID as identifying keys? with redis.smembers
we can also check every 5 seconds if we got data stuck in sync-processing
mandatory change:
- we need to use lists instead of sets since we have to tolerate same entity changes at almost the same time
- we have to use UUID's since same entity updates in span of 1 sec could reflect different transactions.
- the queue actually serves as a chronological snapshot
in node, it's redis.eval(script, 0, args). in lua script, it's redis.call(method, args)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
worker can be turned into an all-around task queue, not limited to elasticsearch.