造輪子系列之——定時任務服務

最初的定時任務是通過 django + celery 管理的,broker 使用了 rabbmitmq,但是最近換了kafka,也就打算下線 rabbmitmq 了,可是目前還有一些定時任務跑在上面,下線之前就得再造一個定時任務的輪子,最新 node 寫的多,所以用 node 來實現。

一,技術棧

  • nodejs
  • redis
  • cron-parser

  • bull
  • moment

二,原理

Redis Keyspace Notifications

IMPORTANT Keyspace notifications is a feature available since 2.8.0

Feature overview

Keyspace notifications allows clients to subscribe to Pub/Sub channels in order to receive events affecting the Redis data set in some way.

Examples of the events that is possible to receive are the following:

  • All the commands affecting a given key.
  • All the keys receiving an LPUSH operation.
  • All the keys expiring in the database 0.

Events are delivered using the normal Pub/Sub layer of Redis, so clients implementing Pub/Sub are able to use this feature without modifications.

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if you application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

In the future there are plans to allow for more reliable delivering of events, but probably this will be addressed at a more general level either bringing reliability to Pub/Sub itself, or allowing Lua scripts to intercept Pub/Sub messages to perform operations like pushing the events into a list.

redis 2.8 以及以上允許客戶端訂閱 redis 事件:

K Keyspace events, published with __keyspace@<db>__ prefix.nE Keyevent events, published with __keyevent@<db>__ prefix.ng Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...n$ String commandsnl List commandsns Set commandsnh Hash commandsnz Sorted set commandsnx Expired events (events generated every time a key expires)ne Evicted events (events generated when a key is evicted for maxmemory)nA Alias for g$lshzxe, so that the "AKE" string means all the events.n

用 redis-cli 可以這樣測試:

$ redis-cli config set notify-keyspace-events KEAn$ redis-cli --csv psubscribe __key*__:*nReading messages... (press Ctrl-C to quit)n"psubscribe","__key*__:*",1n

接著執行set方法會看到窗口打出了對應的事件。

三,思路

上面介紹了基本原理,接下來要實現的功能如下:

  1. cron-parse 將 crontab 格式的字元串解析成對應的時間(moment),它有一個next方法,每次調用返回下一次的執行時間
  2. bull 的簡介上寫的是 job manager,我們就可以使用parse得到的時間和當前時間做差,作為bull的delay值,(本質上是設置了redis key的過期時間,到時間會觸發event),bull收到我們添加的task以後會返回一個job
  3. 這時候用我們的taskManager來處理這個job
  4. express做web伺服器實現介面

四,實現

Crontab:

const Queue = require(bull)nconst moment = require(moment)nconst parser = require(cron-parser)nconst conf = require(../conf).redisnconst log4js = require(./log4js)nconst logger = log4js.getLogger(app)nnclass Crontab {n n constructor(options, process) {n const defaultOptions = {n name: crontab,n redisConfig: {n host: conf.host || 127.0.0.1,n port: conf.port || 6379,n db: conf.db || 0,n password: conf.password || nulln },n queue: {n removeOnComplete: truen }n }n n this.options = Object.assign(defaultOptions, options)n this.process = process || function(task){log.info(task)} n this.__init()n }nn __init() {n const _this = thisn const queue = Queue(_this.options.name, {n redis: this.options.redisConfign })nn queue.on(ready, function () {n logger.info(#ready, %s is ready, _this.options.name)n })nn queue.on(failed, function (job, err) {n logger.error(#error, job.jobId)n logger.error(#error, err.stack)n })nn queue.process(function (job, done) {n let task = job.datan _this.process(task) n done()n })nn this.queue = queuen }nn __calcNext(task) {n let interval = parser.parseExpression(task.crontab)n let current = moment().unix()n let delay = interval.next()._date.unix() - currentn return (delay == 0 ? interval.next()._date.unix() - current : delay) * 1000n }nn publish(task) { n return this.queue.add(task, {n delay: this.__calcNext(task),n removeOnComplete: this.options.queue.removeOnCompleten })n }n}nnmodule.exports = Crontabn

TaskManager:

const shortid = require(shortid)nconst redis = require(./redis)nconst log4js = require(./log4js)nconst logger = log4js.getLogger(app)nconst tasks = require(../tasks/)nconst Crontab = require(./crontab)nnclass TaskManager {nn constructor(options) {n const defaultOptions = {n name: tasksn }n this.options = Object.assign({}, defaultOptions, options)n this.crontab = new Crontab({}, this.__process.bind(this))n this.tasks = tasksn }nn __publish(id, task, callback) {n var callback = callback || function () { }n this.crontab.publish({n id: id,n crontab: task.crontabn }).then(() => {n callback(false)n }).catch(err => {n logger.info(err)n callback({ code: 500, err: err })n })n nn __handler(id, task, callback) {n const _this = thisn var callback = callback || function () { }nn if (this.tasks[task.method]) {n this.tasks[task.method].apply(this, task.params.split(,))n }nn if (task.once == true) {n logger.info(once)n } else {n logger.info(interval)n _this.__publish(id, task)n }n }nn __process(task) {n const _this = thisn const id = task.idn const options = this.optionsnn redis.hgetall(options.name + : + id).then(obj => {n _this.__handler(id, obj)n }).catch(err => {n logger.error(#process err or canceled., err)n })n }nn add(task, callback) {n const _this = thisn const options = this.optionsn const id = shortid.generate()nn redis.hmset(options.name + : + id, task)n .then(res => {n _this.__publish(id, task, callback)n })n .catch(err => {n callback({ code: 500, err: err })n })n }nn del(id, callback) {n const options = this.optionsn redis.del(options.name + : + id).then(res => {n callback(false, res)n }).catch(err => {n callback({ code: 500, err: err })n })n }n}nnmodule.exports = new TaskManager()n

redis:

const Redis = require(ioredis)nconst conf = require(../conf/).redisnconst log4js = require(./log4js)nconst logger = log4js.getLogger(console)nnconst options = {n host: conf.host || 127.0.0.1,n port: conf.port || 6379,n password: conf.password || null,n family: 4,n db: conf.db || 0 n}nnconst client = new Redis(options)nnclient.on(connect, function () {n logger.info(#redis connected.)n})nnclient.on("error", function (err) {n logger.error("#redis Error: " + err)n})nnmodule.exports = clientn

五,其他

目前還不是很完善,不過實現簡單的任務還是ok的,後續考慮加入gRPC,這樣可以更方便的調用遠程任務

  • Magic Term Libraries

推薦閱讀:

圖表庫源碼剖析 - Chart.js 最流行的 Canvas 圖表庫
Vue中父孫組件通訊

TAG:Nodejs | 前端开发 |