Skip to content

Instantly share code, notes, and snippets.

@siteslave
Created January 13, 2023 07:38
Show Gist options
  • Save siteslave/c1d74878feb190ba40bc53ed37297dd6 to your computer and use it in GitHub Desktop.
Save siteslave/c1d74878feb190ba40bc53ed37297dd6 to your computer and use it in GitHub Desktop.
const { Queue } = require('bullmq')
// Add your own configuration here
const redisConfiguration = {
connection: {
host: "localhost",
port: 6379,
enableOfflineQueue: false,
// username: "default",
// password: "redispw"
}
}
const myQueue = new Queue('R7Platform', {
connection: redisConfiguration.connection,
defaultJobOptions: {
delay: 1000,
attempts: 1,
backoff: {
type: 'exponential',
delay: 3000,
},
removeOnComplete: {
age: 3600, // keep up to 1 hour
count: 1000, // keep up to 1000 jobs
},
removeOnFail: {
age: 24 * 3600, // keep up to 24 hours
},
},
});
async function personQueue(hn, cid, fname, lname) {
await myQueue.add('person',
{ hn, cid, fname, lname }
);
}
async function opdQueue(hn, vn) {
await myQueue.add('opd',
{ hn, vn }
);
}
for (let index = 0; index < 20; index++) {
personQueue("0041228", "3440000000232", "สถิตย์", "เรียนพิศ");
opdQueue("0041228", "66011025665005");
}
@siteslave
Copy link
Author

#Bull UI

const { ExpressAdapter, createBullBoard, BullMQAdapter } = require('@bull-board/express');
const { Queue: QueueMQ, Worker, QueueScheduler } = require('bullmq');
const express = require('express');

const sleep = (t) => new Promise((resolve) => setTimeout(resolve, t * 1000));

const redisOptions = {
  port: 6379,
  host: 'localhost',
  password: '',
  tls: false,
};

const createQueueMQ = (name) => new QueueMQ(name, { connection: redisOptions });

async function setupBullMQProcessor(queueName) {
  // const queueScheduler = new QueueScheduler(queueName, {
  //   connection: redisOptions,
  // });
  // await queueScheduler.waitUntilReady();

  new Worker(queueName, async (job) => {
    for (let i = 0; i <= 100; i++) {
      await sleep(Math.random());
      await job.updateProgress(i);
      await job.log(`Processing job at interval ${i}`);

      if (Math.random() * 200 < 1) throw new Error(`Random error ${i}`);
    }

    return { jobId: `This is the return value of job (${job.id})` };
  });
}

const run = async () => {
  const stepsQueue = createQueueMQ('steps');
  const r7Queue = createQueueMQ('R7Queue');

  await setupBullMQProcessor(stepsQueue.name);
  await setupBullMQProcessor(r7Queue.name);

  const app = express();

  const serverAdapter = new ExpressAdapter();
  serverAdapter.setBasePath('/ui');

  createBullBoard({
    queues: [
      new BullMQAdapter(stepsQueue),
      new BullMQAdapter(r7Queue),
    ],
    serverAdapter,
    "options": {
      uiConfig: {
        boardTitle: 'R7 Queue',
        // boardLogo: {
        //   path: 'https://r7.moph.go.th/home/img/logo_101.png',
        //   width: '100px', height: '40px'
        // },
      },
    }
  });

  app.use('/ui', serverAdapter.getRouter());

  app.use('/add', (req, res) => {
    const opts = req.query.opts || {};

    if (opts.delay) {
      opts.delay = +opts.delay * 1000; // delay must be a number
    }

    exampleBullMq.add('Add', { title: req.query.title }, opts);

    res.json({
      ok: true,
    });
  });

  app.listen(3000, () => {
    console.log('Running on 3000...');
    console.log('For the UI, open http://localhost:3000/ui');
    console.log('Make sure Redis is running on port 6379 by default');
    console.log('To populate the queue, run:');
    console.log('  curl http://localhost:3000/add?title=Example');
    console.log('To populate the queue with custom options (opts), run:');
    console.log('  curl http://localhost:3000/add?title=Test&opts[delay]=9');
  });
};

// eslint-disable-next-line no-console
run().catch((e) => console.error(e));

@siteslave
Copy link
Author

Consumer

const { Worker } = require('bullmq')

const redisConfiguration = {
  connection: {
    host: "localhost",
    port: 6379,
    enableOfflineQueue: false,
    // username: "default",
    // password: "redispw"
  }
}

async function importPerson(job) {
  const { fname, lname } = job.data;
  console.log(`Patient name: ${fname} ${lname}.`)
  return job.id;
}

async function importOpd(job) {
  const { vn, hn } = job.data;
  console.log(`vn: ${vn}, hn: ${hn}.`)
  return job.id;
}

const worker = new Worker('R7Queue', async job => {
  switch (job.name) {
    case 'PERSON': {
      await importPerson(job);
      break;
    }
    case 'opd': {
      await importOpd(job);
      break;
    }
  }
}, {
  limiter: {
    max: 10,
    duration: 1000,
  },
  concurrency: 4, connection: redisConfiguration.connection,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 3000,
    },
  }
});

worker.on('completed', job => {
  console.info(`${job.id} has completed!`);
});

worker.on('failed', (job, err) => {
  console.error(`${job.id} has failed with ${err.message}`);
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment