Convert Figma logo to code with AI

Automattic logokue

Kue is a priority job queue backed by redis, built for node.js.

9,452
865
9,452
287

Top Related Projects

A simple, fast, robust job/task queue for Node.js, backed by Redis.

15,398

Premium Queue package for handling distributed jobs and messages in NodeJS.

9,360

Lightweight job scheduling for Node.js

Node.js Background jobs backed by redis.

Quick Overview

Kue is a priority job queue for Node.js backed by Redis. It provides a clean, feature-rich API for creating, processing, and managing background jobs. Kue is designed to be scalable and robust, making it suitable for both small and large-scale applications.

Pros

  • Easy to use and intuitive API
  • Supports job priority and job types
  • Built-in web interface for monitoring and managing jobs
  • Scalable and can handle high-volume job processing

Cons

  • Requires Redis as a dependency
  • Limited built-in support for distributed processing
  • Not actively maintained (last commit was in 2019)
  • Some reported issues with job reliability in certain edge cases

Code Examples

  1. Creating a job:
const kue = require('kue');
const queue = kue.createQueue();

const job = queue.create('email', {
  title: 'Welcome Email',
  to: 'user@example.com',
  template: 'welcome-email'
}).priority('high').attempts(5).save();

job.on('complete', function() {
  console.log('Job completed');
});
  1. Processing jobs:
queue.process('email', function(job, done) {
  sendEmail(job.data.to, job.data.template, function(err) {
    if (err) return done(err);
    done();
  });
});
  1. Handling failed jobs:
queue.on('job failed', function(id, err) {
  console.log('Job', id, 'failed:', err);
});

Getting Started

To get started with Kue, follow these steps:

  1. Install Kue and Redis:

    npm install kue redis
    
  2. Create a basic job queue:

    const kue = require('kue');
    const queue = kue.createQueue();
    
    // Create a job
    const job = queue.create('myJob', {
      title: 'Process something',
      data: { /* job data */ }
    }).save();
    
    // Process jobs
    queue.process('myJob', function(job, done) {
      // Process job here
      console.log('Processing job:', job.id);
      done();
    });
    
  3. Start the web UI (optional):

    kue.app.listen(3000);
    console.log('Kue UI is running on port 3000');
    

Remember to have Redis running before using Kue.

Competitor Comparisons

A simple, fast, robust job/task queue for Node.js, backed by Redis.

Pros of bee-queue

  • Significantly faster performance, especially for high-volume queues
  • Lower memory usage, making it more efficient for resource-constrained environments
  • Simpler API and codebase, easier to understand and maintain

Cons of bee-queue

  • Fewer features compared to Kue, such as job priority and concurrency control
  • Less mature ecosystem and community support
  • Limited to Redis as the only backend storage option

Code Comparison

bee-queue:

const Queue = require('bee-queue');
const queue = new Queue('example');

queue.createJob({x: 2, y: 3}).save();
queue.process(job => {
  return job.data.x + job.data.y;
});

Kue:

const kue = require('kue');
const queue = kue.createQueue();

queue.create('example', {x: 2, y: 3}).save();
queue.process('example', (job, done) => {
  done(null, job.data.x + job.data.y);
});

Both bee-queue and Kue are Node.js job queue libraries, but they have different focuses. bee-queue prioritizes performance and simplicity, making it ideal for high-volume, straightforward job processing. Kue offers more features and flexibility, suitable for complex job management scenarios. The choice between them depends on specific project requirements, such as performance needs, feature set, and ecosystem support.

15,398

Premium Queue package for handling distributed jobs and messages in NodeJS.

Pros of Bull

  • More active development and maintenance
  • Better performance and scalability
  • Supports Redis Cluster for high availability

Cons of Bull

  • Steeper learning curve for beginners
  • Requires Redis as a dependency

Code Comparison

Bull:

const Queue = require('bull');
const myQueue = new Queue('my-queue');

myQueue.add({ foo: 'bar' });

myQueue.process(async (job) => {
  console.log(job.data);
});

Kue:

const kue = require('kue');
const queue = kue.createQueue();

queue.create('my-job', { foo: 'bar' }).save();

queue.process('my-job', (job, done) => {
  console.log(job.data);
  done();
});

Both Bull and Kue are popular job queue libraries for Node.js, but they have some key differences. Bull offers better performance and scalability, making it more suitable for high-throughput applications. It also supports Redis Cluster, which is beneficial for high availability setups. However, Bull has a steeper learning curve compared to Kue and requires Redis as a dependency.

Kue, on the other hand, is simpler to set up and use, making it a good choice for smaller projects or developers new to job queues. However, it has less active development and may not perform as well in high-load scenarios.

The code examples demonstrate the basic usage of both libraries. Bull uses a more modern, promise-based API, while Kue relies on callbacks. Bull's API is generally considered more intuitive and easier to work with in modern Node.js applications.

9,360

Lightweight job scheduling for Node.js

Pros of Agenda

  • Supports MongoDB for job persistence, allowing for better scalability and data management
  • Offers more advanced scheduling options, including repeatable jobs and time zones
  • Provides a cleaner API and more intuitive job management

Cons of Agenda

  • Requires MongoDB, which may be an additional dependency for some projects
  • Less mature ecosystem compared to Kue, with fewer plugins and integrations
  • May have a steeper learning curve for developers new to MongoDB

Code Comparison

Agenda:

const Agenda = require('agenda');
const agenda = new Agenda({db: {address: 'mongodb://127.0.0.1/agenda-example'}});

agenda.define('send email', async job => {
  await sendEmail(job.attrs.data.to, job.attrs.data.subject);
});

await agenda.start();
await agenda.schedule('in 5 minutes', 'send email', {to: 'user@example.com', subject: 'Hello'});

Kue:

const kue = require('kue');
const queue = kue.createQueue();

queue.create('send email', {
  to: 'user@example.com',
  subject: 'Hello'
}).delay(5 * 60 * 1000).save();

queue.process('send email', async (job, done) => {
  await sendEmail(job.data.to, job.data.subject);
  done();
});

Both libraries offer job scheduling and processing capabilities, but Agenda provides more advanced features and MongoDB integration, while Kue offers a simpler setup with Redis as its backend.

Node.js Background jobs backed by redis.

Pros of node-resque

  • Built on top of Redis, providing better scalability and persistence
  • Supports multiple queues with priorities
  • Offers plugins for additional functionality like unique jobs and recurring jobs

Cons of node-resque

  • Less mature and potentially less stable compared to Kue
  • Smaller community and fewer resources available
  • May require more setup and configuration

Code Comparison

node-resque:

const { Queue } = require('node-resque');
const queue = new Queue({ connection: { host: '127.0.0.1', port: 6379 } });
await queue.connect();
await queue.enqueue('default', 'myJob', [1, 2, 3]);

Kue:

const kue = require('kue');
const queue = kue.createQueue();
const job = queue.create('myJob', { data: [1, 2, 3] })
  .save(err => {
    if (!err) console.log('Job saved');
  });

Both libraries offer similar functionality for creating and managing job queues. node-resque uses Redis as its backend, which can provide better scalability and persistence. It also supports multiple queues with priorities and offers plugins for additional features.

However, Kue has been around longer and has a larger community, which may result in better stability and more resources. It also has a built-in web interface for job management, which node-resque lacks.

The choice between the two depends on specific project requirements, such as scalability needs, desired features, and familiarity with Redis.

Convert Figma logo designs to code with AI

Visual Copilot

Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.

Try Visual Copilot

README

Kue

Kue is no longer maintained

Please see e.g. Bull as an alternative. Thank you!

Build Status npm version Dependency Status Join the chat at https://gitter.im/Automattic/kue

Kue is a priority job queue backed by redis, built for node.js.

PROTIP This is the latest Kue documentation, make sure to also read the changelist.

Upgrade Notes (Please Read)

Installation

  • Latest release:

    $ npm install kue
    
  • Master branch:

    $ npm install http://github.com/Automattic/kue/tarball/master
    

NPM

Features

  • Delayed jobs
  • Distribution of parallel work load
  • Job event and progress pubsub
  • Job TTL
  • Optional retries with backoff
  • Graceful workers shutdown
  • Full-text search capabilities
  • RESTful JSON API
  • Rich integrated UI
  • Infinite scrolling
  • UI progress indication
  • Job specific logging
  • Powered by Redis

Overview

Creating Jobs

First create a job Queue with kue.createQueue():

var kue = require('kue')
  , queue = kue.createQueue();

Calling queue.create() with the type of job ("email"), and arbitrary job data will return a Job, which can then be save()ed, adding it to redis, with a default priority level of "normal". The save() method optionally accepts a callback, responding with an error if something goes wrong. The title key is special-cased, and will display in the job listings within the UI, making it easier to find a specific job.

var job = queue.create('email', {
    title: 'welcome email for tj'
  , to: 'tj@learnboost.com'
  , template: 'welcome-email'
}).save( function(err){
   if( !err ) console.log( job.id );
});

Job Priority

To specify the priority of a job, simply invoke the priority() method with a number, or priority name, which is mapped to a number.

queue.create('email', {
    title: 'welcome email for tj'
  , to: 'tj@learnboost.com'
  , template: 'welcome-email'
}).priority('high').save();

The default priority map is as follows:

{
    low: 10
  , normal: 0
  , medium: -5
  , high: -10
  , critical: -15
};

Failure Attempts

By default jobs only have one attempt, that is when they fail, they are marked as a failure, and remain that way until you intervene. However, Kue allows you to specify this, which is important for jobs such as transferring an email, which upon failure, may usually retry without issue. To do this invoke the .attempts() method with a number.

 queue.create('email', {
     title: 'welcome email for tj'
   , to: 'tj@learnboost.com'
   , template: 'welcome-email'
 }).priority('high').attempts(5).save();

Failure Backoff

Job retry attempts are done as soon as they fail, with no delay, even if your job had a delay set via Job#delay. If you want to delay job re-attempts upon failures (known as backoff) you can use Job#backoff method in different ways:

    // Honor job's original delay (if set) at each attempt, defaults to fixed backoff
    job.attempts(3).backoff( true )

    // Override delay value, fixed backoff
    job.attempts(3).backoff( {delay: 60*1000, type:'fixed'} )

    // Enable exponential backoff using original delay (if set)
    job.attempts(3).backoff( {type:'exponential'} )

    // Use a function to get a customized next attempt delay value
    job.attempts(3).backoff( function( attempts, delay ){
      //attempts will correspond to the nth attempt failure so it will start with 0
      //delay will be the amount of the last delay, not the initial delay unless attempts === 0
      return my_customized_calculated_delay;
    })

In the last scenario, provided function will be executed (via eval) on each re-attempt to get next attempt delay value, meaning that you can't reference external/context variables within it.

Job TTL

Job producers can set an expiry value for the time their job can live in active state, so that if workers didn't reply in timely fashion, Kue will fail it with TTL exceeded error message preventing that job from being stuck in active state and spoiling concurrency.

queue.create('email', {title: 'email job with TTL'}).ttl(milliseconds).save();

Job Logs

Job-specific logs enable you to expose information to the UI at any point in the job's life-time. To do so simply invoke job.log(), which accepts a message string as well as variable-arguments for sprintf-like support:

job.log('$%d sent to %s', amount, user.name);

or anything else (uses util.inspect() internally):

job.log({key: 'some key', value: 10});
job.log([1,2,3,5,8]);
job.log(10.1);

Job Progress

Job progress is extremely useful for long-running jobs such as video conversion. To update the job's progress simply invoke job.progress(completed, total [, data]):

job.progress(frames, totalFrames);

data can be used to pass extra information about the job. For example a message or an object with some extra contextual data to the current status.

Job Events

Job-specific events are fired on the Job instances via Redis pubsub. The following events are currently supported:

  • enqueue the job is now queued
  • start the job is now running
  • promotion the job is promoted from delayed state to queued
  • progress the job's progress ranging from 0-100
  • failed attempt the job has failed, but has remaining attempts yet
  • failed the job has failed and has no remaining attempts
  • complete the job has completed
  • remove the job has been removed

For example this may look something like the following:

var job = queue.create('video conversion', {
    title: 'converting loki\'s to avi'
  , user: 1
  , frames: 200
});

job.on('complete', function(result){
  console.log('Job completed with data ', result);

}).on('failed attempt', function(errorMessage, doneAttempts){
  console.log('Job failed');

}).on('failed', function(errorMessage){
  console.log('Job failed');

}).on('progress', function(progress, data){
  console.log('\r  job #' + job.id + ' ' + progress + '% complete with data ', data );

});

Note that Job level events are not guaranteed to be received upon process restarts, since restarted node.js process will lose the reference to the specific Job object. If you want a more reliable event handler look for Queue Events.

Note Kue stores job objects in memory until they are complete/failed to be able to emit events on them. If you have a huge concurrency in uncompleted jobs, turn this feature off and use queue level events for better memory scaling.

kue.createQueue({jobEvents: false})

Alternatively, you can use the job level function events to control whether events are fired for a job at the job level.

var job = queue.create('test').events(false).save();

Queue Events

Queue-level events provide access to the job-level events previously mentioned, however scoped to the Queue instance to apply logic at a "global" level. An example of this is removing completed jobs:

queue.on('job enqueue', function(id, type){
  console.log( 'Job %s got queued of type %s', id, type );

}).on('job complete', function(id, result){
  kue.Job.get(id, function(err, job){
    if (err) return;
    job.remove(function(err){
      if (err) throw err;
      console.log('removed completed job #%d', job.id);
    });
  });
});

The events available are the same as mentioned in "Job Events", however prefixed with "job ".

Delayed Jobs

Delayed jobs may be scheduled to be queued for an arbitrary distance in time by invoking the .delay(ms) method, passing the number of milliseconds relative to now. Alternatively, you can pass a JavaScript Date object with a specific time in the future. This automatically flags the Job as "delayed".

var email = queue.create('email', {
    title: 'Account renewal required'
  , to: 'tj@learnboost.com'
  , template: 'renewal-email'
}).delay(milliseconds)
  .priority('high')
  .save();

Kue will check the delayed jobs with a timer, promoting them if the scheduled delay has been exceeded, defaulting to a check of top 1000 jobs every second.

Processing Jobs

Processing jobs is simple with Kue. First create a Queue instance much like we do for creating jobs, providing us access to redis etc, then invoke queue.process() with the associated type. Note that unlike what the name createQueue suggests, it currently returns a singleton Queue instance. So you can configure and use only a single Queue object within your node.js process.

In the following example we pass the callback done to email, When an error occurs we invoke done(err) to tell Kue something happened, otherwise we invoke done() only when the job is complete. If this function responds with an error it will be displayed in the UI and the job will be marked as a failure. The error object passed to done, should be of standard type Error.

var kue = require('kue')
 , queue = kue.createQueue();

queue.process('email', function(job, done){
  email(job.data.to, done);
});

function email(address, done) {
  if(!isValidEmail(address)) {
    //done('invalid to address') is possible but discouraged
    return done(new Error('invalid to address'));
  }
  // email send stuff...
  done();
}

Workers can also pass job result as the second parameter to done done(null,result) to store that in Job.result key. result is also passed through complete event handlers so that job producers can receive it if they like to.

Processing Concurrency

By default a call to queue.process() will only accept one job at a time for processing. For small tasks like sending emails this is not ideal, so we may specify the maximum active jobs for this type by passing a number:

queue.process('email', 20, function(job, done){
  // ...
});

Pause Processing

Workers can temporarily pause and resume their activity. That is, after calling pause they will receive no jobs in their process callback until resume is called. The pause function gracefully shutdowns this worker, and uses the same internal functionality as the shutdown method in Graceful Shutdown.

queue.process('email', function(job, ctx, done){
  ctx.pause( 5000, function(err){
    console.log("Worker is paused... ");
    setTimeout( function(){ ctx.resume(); }, 10000 );
  });
});

Note The ctx parameter from Kue >=0.9.0 is the second argument of the process callback function and done is idiomatically always the last

Note The pause method signature is changed from Kue >=0.9.0 to move the callback function to the last.

Updating Progress

For a "real" example, let's say we need to compile a PDF from numerous slides with node-canvas. Our job may consist of the following data, note that in general you should not store large data in the job it-self, it's better to store references like ids, pulling them in while processing.

queue.create('slideshow pdf', {
    title: user.name + "'s slideshow"
  , slides: [...] // keys to data stored in redis, mongodb, or some other store
});

We can access this same arbitrary data within a separate process while processing, via the job.data property. In the example we render each slide one-by-one, updating the job's log and progress.

queue.process('slideshow pdf', 5, function(job, done){
  var slides = job.data.slides
    , len = slides.length;

  function next(i) {
    var slide = slides[i]; // pretend we did a query on this slide id ;)
    job.log('rendering %dx%d slide', slide.width, slide.height);
    renderSlide(slide, function(err){
      if (err) return done(err);
      job.progress(i, len, {nextSlide : i == len ? 'itsdone' : i + 1});
      if (i == len) done()
      else next(i + 1);
    });
  }

  next(0);
});

Graceful Shutdown

Queue#shutdown([timeout,] fn) signals all workers to stop processing after their current active job is done. Workers will wait timeout milliseconds for their active job's done to be called or mark the active job failed with shutdown error reason. When all workers tell Kue they are stopped fn is called.

var queue = require('kue').createQueue();

process.once( 'SIGTERM', function ( sig ) {
  queue.shutdown( 5000, function(err) {
    console.log( 'Kue shutdown: ', err||'' );
    process.exit( 0 );
  });
});

Note that shutdown method signature is changed from Kue >=0.9.0 to move the callback function to the last.

Error Handling

All errors either in Redis client library or Queue are emitted to the Queue object. You should bind to error events to prevent uncaught exceptions or debug kue errors.

var queue = require('kue').createQueue();

queue.on( 'error', function( err ) {
  console.log( 'Oops... ', err );
});

Prevent from Stuck Active Jobs

Kue marks a job complete/failed when done is called by your worker, so you should use proper error handling to prevent uncaught exceptions in your worker's code and node.js process exiting before in handle jobs get done. This can be achieved in two ways:

  1. Wrapping your worker's process function in Domains
queue.process('my-error-prone-task', function(job, done){
  var domain = require('domain').create();
  domain.on('error', function(err){
    done(err);
  });
  domain.run(function(){ // your process function
    throw new Error( 'bad things happen' );
    done();
  });
});

Notice - Domains are deprecated from Nodejs with stability 0 and it's not recommended to use.

This is the softest and best solution, however is not built-in with Kue. Please refer to this discussion. You can comment on this feature in the related open Kue issue.

You can also use promises to do something like

queue.process('my-error-prone-task', function(job, done){
  Promise.method( function(){ // your process function
    throw new Error( 'bad things happen' );
  })().nodeify(done)
});

but this won't catch exceptions in your async call stack as domains do.

  1. Binding to uncaughtException and gracefully shutting down the Kue, however this is not a recommended error handling idiom in javascript since you are losing the error context.
process.once( 'uncaughtException', function(err){
  console.error( 'Something bad happened: ', err );
  queue.shutdown( 1000, function(err2){
    console.error( 'Kue shutdown result: ', err2 || 'OK' );
    process.exit( 0 );
  });
});

Unstable Redis connections

Kue currently uses client side job state management and when redis crashes in the middle of that operations, some stuck jobs or index inconsistencies will happen. The consequence is that certain number of jobs will be stuck, and be pulled out by worker only when new jobs are created, if no more new jobs are created, they stuck forever. So we strongly suggest that you run watchdog to fix this issue by calling:

queue.watchStuckJobs(interval)

interval is in milliseconds and defaults to 1000ms

Kue will be refactored to fully atomic job state management from version 1.0 and this will happen by lua scripts and/or BRPOPLPUSH combination. You can read more here and here.

Queue Maintenance

Queue object has two type of methods to tell you about the number of jobs in each state

queue.inactiveCount( function( err, total ) { // others are activeCount, completeCount, failedCount, delayedCount
  if( total > 100000 ) {
    console.log( 'We need some back pressure here' );
  }
});

you can also query on an specific job type:

queue.failedCount( 'my-critical-job', function( err, total ) {
  if( total > 10000 ) {
    console.log( 'This is tOoOo bad' );
  }
});

and iterating over job ids

queue.inactive( function( err, ids ) { // others are active, complete, failed, delayed
  // you may want to fetch each id to get the Job object out of it...
});

however the second one doesn't scale to large deployments, there you can use more specific Job static methods:

kue.Job.rangeByState( 'failed', 0, n, 'asc', function( err, jobs ) {
  // you have an array of maximum n Job objects here
});

or

kue.Job.rangeByType( 'my-job-type', 'failed', 0, n, 'asc', function( err, jobs ) {
  // you have an array of maximum n Job objects here
});

Note that the last two methods are subject to change in later Kue versions.

Programmatic Job Management

If you did none of above in Error Handling section or your process lost active jobs in any way, you can recover from them when your process is restarted. A blind logic would be to re-queue all stuck jobs:

queue.active( function( err, ids ) {
  ids.forEach( function( id ) {
    kue.Job.get( id, function( err, job ) {
      // Your application should check if job is a stuck one
      job.inactive();
    });
  });
});

Note in a clustered deployment your application should be aware not to involve a job that is valid, currently inprocess by other workers.

Job Cleanup

Jobs data and search indexes eat up redis memory space, so you will need some job-keeping process in real world deployments. Your first chance is using automatic job removal on completion.

queue.create( ... ).removeOnComplete( true ).save()

But if you eventually/temporally need completed job data, you can setup an on-demand job removal script like below to remove top n completed jobs:

kue.Job.rangeByState( 'complete', 0, n, 'asc', function( err, jobs ) {
  jobs.forEach( function( job ) {
    job.remove( function(){
      console.log( 'removed ', job.id );
    });
  });
});

Note that you should provide enough time for .remove calls on each job object to complete before your process exits, or job indexes will leak

Redis Connection Settings

By default, Kue will connect to Redis using the client default settings (port defaults to 6379, host defaults to 127.0.0.1, prefix defaults to q). Queue#createQueue(options) accepts redis connection options in options.redis key.

var kue = require('kue');
var q = kue.createQueue({
  prefix: 'q',
  redis: {
    port: 1234,
    host: '10.0.50.20',
    auth: 'password',
    db: 3, // if provided select a non-default redis db
    options: {
      // see https://github.com/mranney/node_redis#rediscreateclient
    }
  }
});

prefix controls the key names used in Redis. By default, this is simply q. Prefix generally shouldn't be changed unless you need to use one Redis instance for multiple apps. It can also be useful for providing an isolated testbed across your main application.

You can also specify the connection information as a URL string.

var q = kue.createQueue({
  redis: 'redis://example.com:1234?redis_option=value&redis_option=value'
});

Connecting using Unix Domain Sockets

Since node_redis supports Unix Domain Sockets, you can also tell Kue to do so. See unix-domain-socket for your redis server configuration.

var kue = require('kue');
var q = kue.createQueue({
  prefix: 'q',
  redis: {
    socket: '/data/sockets/redis.sock',
    auth: 'password',
    options: {
      // see https://github.com/mranney/node_redis#rediscreateclient
    }
  }
});

Replacing Redis Client Module

Any node.js redis client library that conforms (or when adapted) to node_redis API can be injected into Kue. You should only provide a createClientFactory function as a redis connection factory instead of providing node_redis connection options.

Below is a sample code to enable redis-sentinel to connect to Redis Sentinel for automatic master/slave failover.

var kue = require('kue');
var Sentinel = require('redis-sentinel');
var endpoints = [
  {host: '192.168.1.10', port: 6379},
  {host: '192.168.1.11', port: 6379}
];
var opts = options || {}; // Standard node_redis client options
var masterName = 'mymaster';
var sentinel = Sentinel.Sentinel(endpoints);

var q = kue.createQueue({
   redis: {
      createClientFactory: function(){
         return sentinel.createClient(masterName, opts);
      }
   }
});

Note that all <0.8.x client codes should be refactored to pass redis options to Queue#createQueue instead of monkey patched style overriding of redis#createClient or they will be broken from Kue 0.8.x.

Using ioredis client with cluster support


var Redis = require('ioredis');
var kue = require('kue');

// using https://github.com/72squared/vagrant-redis-cluster

var queue = kue.createQueue({
    redis: {
      createClientFactory: function () {
        return new Redis.Cluster([{
          port: 7000
        }, {
          port: 7001
        }]);
      }
    }
  });

User-Interface

The UI is a small Express application. A script is provided in bin/ for running the interface as a standalone application with default settings. You may pass in options for the port, redis-url, and prefix. For example:

node_modules/kue/bin/kue-dashboard -p 3050 -r redis://127.0.0.1:3000 -q prefix

You can fire it up from within another application too:

var kue = require('kue');
kue.createQueue(...);
kue.app.listen(3000);

The title defaults to "Kue", to alter this invoke:

kue.app.set('title', 'My Application');

Note that if you are using non-default Kue options, kue.createQueue(...) must be called before accessing kue.app.

Third-party interfaces

You can also use Kue-UI web interface contributed by Arnaud Bénard

JSON API

Along with the UI Kue also exposes a JSON API, which is utilized by the UI.

GET /job/search?q=

Query jobs, for example "GET /job/search?q=avi video":

["5", "7", "10"]

By default kue indexes the whole Job data object for searching, but this can be customized via calling Job#searchKeys to tell kue which keys on Job data to create index for:

var kue = require('kue');
queue = kue.createQueue();
queue.create('email', {
    title: 'welcome email for tj'
  , to: 'tj@learnboost.com'
  , template: 'welcome-email'
}).searchKeys( ['to', 'title'] ).save();

Search feature is turned off by default from Kue >=0.9.0. Read more about this here. You should enable search indexes and add reds in your dependencies if you need to:

var kue = require('kue');
q = kue.createQueue({
    disableSearch: false
});
npm install reds --save

GET /stats

Currently responds with state counts, and worker activity time in milliseconds:

{"inactiveCount":4,"completeCount":69,"activeCount":2,"failedCount":0,"workTime":20892}

GET /job/:id

Get a job by :id:

{"id":"3","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":"100","state":"complete","attempts":null,"created_at":"1309973155248","updated_at":"1309973155248","duration":"15002"}

GET /job/:id/log

Get job :id's log:

['foo', 'bar', 'baz']

GET /jobs/:from..:to/:order?

Get jobs with the specified range :from to :to, for example "/jobs/0..2", where :order may be "asc" or "desc":

[{"id":"12","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":0,"state":"active","attempts":null,"created_at":"1309973299293","updated_at":"1309973299293"},{"id":"130","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":0,"state":"active","attempts":null,"created_at":"1309975157291","updated_at":"1309975157291"}]

GET /jobs/:state/:from..:to/:order?

Same as above, restricting by :state which is one of:

- active
- inactive
- failed
- complete

GET /jobs/:type/:state/:from..:to/:order?

Same as above, however restricted to :type and :state.

DELETE /job/:id

Delete job :id:

$ curl -X DELETE http://local:3000/job/2
{"message":"job 2 removed"}

POST /job

Create a job:

$ curl -H "Content-Type: application/json" -X POST -d \
    '{
       "type": "email",
       "data": {
         "title": "welcome email for tj",
         "to": "tj@learnboost.com",
         "template": "welcome-email"
       },
       "options" : {
         "attempts": 5,
         "priority": "high"
       }
     }' http://localhost:3000/job
{"message": "job created", "id": 3}

You can create multiple jobs at once by passing an array. In this case, the response will be an array too, preserving the order:

$ curl -H "Content-Type: application/json" -X POST -d \
    '[{
       "type": "email",
       "data": {
         "title": "welcome email for tj",
         "to": "tj@learnboost.com",
         "template": "welcome-email"
       },
       "options" : {
         "attempts": 5,
         "priority": "high"
       }
     },
     {
       "type": "email",
       "data": {
         "title": "followup email for tj",
         "to": "tj@learnboost.com",
         "template": "followup-email"
       },
       "options" : {
         "delay": 86400,
         "attempts": 5,
         "priority": "high"
       }
     }]' http://localhost:3000/job
[
  {"message": "job created", "id": 4},
  {"message": "job created", "id": 5}
]

Note: when inserting multiple jobs in bulk, if one insertion fails Kue will keep processing the remaining jobs in order. The response array will contain the ids of the jobs added successfully, and any failed element will be an object describing the error: {"error": "error reason"}.

Parallel Processing With Cluster

The example below shows how you may use Cluster to spread the job processing load across CPUs. Please see Cluster module's documentation for more detailed examples on using it.

When cluster .isMaster the file is being executed in context of the master process, in which case you may perform tasks that you only want once, such as starting the web app bundled with Kue. The logic in the else block is executed per worker.

var kue = require('kue')
  , cluster = require('cluster')
  , queue = kue.createQueue();

var clusterWorkerSize = require('os').cpus().length;

if (cluster.isMaster) {
  kue.app.listen(3000);
  for (var i = 0; i < clusterWorkerSize; i++) {
    cluster.fork();
  }
} else {
  queue.process('email', 10, function(job, done){
    var pending = 5
      , total = pending;

    var interval = setInterval(function(){
      job.log('sending!');
      job.progress(total - pending, total);
      --pending || done();
      pending || clearInterval(interval);
    }, 1000);
  });
}

This will create an email job processor (worker) per each of your machine CPU cores, with each you can handle 10 concurrent email jobs, leading to total 10 * N concurrent email jobs processed in your N core machine.

Now when you visit Kue's UI in the browser you'll see that jobs are being processed roughly N times faster! (if you have N cores).

Securing Kue

Through the use of app mounting you may customize the web application, enabling TLS, or adding additional middleware like basic-auth-connect.

$ npm install --save basic-auth-connect
var basicAuth = require('basic-auth-connect');
var app = express.createServer({ ... tls options ... });
app.use(basicAuth('foo', 'bar'));
app.use(kue.app);
app.listen(3000);

Testing

Enable test mode to push all jobs into a jobs array. Make assertions against the jobs in that array to ensure code under test is correctly enqueuing jobs.

queue = require('kue').createQueue();

before(function() {
  queue.testMode.enter();
});

afterEach(function() {
  queue.testMode.clear();
});

after(function() {
  queue.testMode.exit()
});

it('does something cool', function() {
  queue.createJob('myJob', { foo: 'bar' }).save();
  queue.createJob('anotherJob', { baz: 'bip' }).save();
  expect(queue.testMode.jobs.length).to.equal(2);
  expect(queue.testMode.jobs[0].type).to.equal('myJob');
  expect(queue.testMode.jobs[0].data).to.eql({ foo: 'bar' });
});

IMPORTANT: By default jobs aren't processed when created during test mode. You can enable job processing by passing true to testMode.enter

before(function() {
  queue.testMode.enter(true);
});

Screencasts

Contributing

We love contributions!

When contributing, follow the simple rules:

  • Don't violate DRY principles.
  • Boy Scout Rule needs to have been applied.
  • Your code should look like all the other code – this project should look like it was written by one person, always.
  • If you want to propose something – just create an issue and describe your question with as much description as you can.
  • If you think you have some general improvement, consider creating a pull request with it.
  • If you add new code, it should be covered by tests. No tests – no code.
  • If you add a new feature, don't forget to update the documentation for it.
  • If you find a bug (or at least you think it is a bug), create an issue with the library version and test case that we can run and see what are you talking about, or at least full steps by which we can reproduce it.

License

(The MIT License)

Copyright (c) 2011 LearnBoost <tj@learnboost.com>

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

NPM DownloadsLast 30 Days