Top Related Projects
CASL is an isomorphic authorization JavaScript library which restricts what resources a given user is allowed to access
Premium Queue package for handling distributed jobs and messages in NodeJS.
A simple, fast, robust job/task queue for Node.js, backed by Redis.
BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
High performance Node.js/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Quick Overview
p-queue is a promise-based queue library for JavaScript that allows you to manage and control the concurrency of asynchronous operations. It provides a flexible and efficient way to limit the number of concurrent promises being executed, making it ideal for rate-limiting API calls, managing resource-intensive tasks, or controlling parallel operations.
Pros
- Offers fine-grained control over concurrency limits
- Supports priority queuing for important tasks
- Provides event emitters for queue state changes
- Lightweight and has no dependencies
Cons
- May introduce complexity for simple use cases
- Requires careful management to avoid deadlocks or starvation
- Learning curve for advanced features and configurations
- Limited built-in support for distributed systems
Code Examples
- Basic usage with concurrency limit:
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
(async () => {
await queue.add(() => fetchData('https://api.example.com/1'));
await queue.add(() => fetchData('https://api.example.com/2'));
await queue.add(() => fetchData('https://api.example.com/3'));
})();
- Using priorities:
const queue = new PQueue({concurrency: 2});
queue.add(() => fetchData('https://api.example.com/low-priority'), {priority: 1});
queue.add(() => fetchData('https://api.example.com/high-priority'), {priority: 5});
- Handling queue events:
const queue = new PQueue({concurrency: 2});
queue.on('active', () => {
console.log(`Working on item. Size: ${queue.size} Pending: ${queue.pending}`);
});
queue.on('idle', () => {
console.log(`Queue is idle. Completed ${queue.size} items.`);
});
Getting Started
To use p-queue in your project, first install it via npm:
npm install p-queue
Then, import and use it in your JavaScript code:
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
(async () => {
await queue.add(async () => {
const result = await someAsyncOperation();
console.log(result);
});
})();
This creates a new queue with a concurrency limit of 2 and adds an async operation to it. The queue will manage the execution of added tasks based on the specified concurrency limit.
Competitor Comparisons
CASL is an isomorphic authorization JavaScript library which restricts what resources a given user is allowed to access
Pros of CASL
- Focuses on authorization and access control, providing a more specialized solution
- Offers a flexible and expressive way to define and manage permissions
- Supports various data stores and can be integrated with different frameworks
Cons of CASL
- More complex setup and learning curve compared to p-queue's simplicity
- May be overkill for projects that don't require advanced authorization features
- Less focused on performance optimization for asynchronous operations
Code Comparison
CASL example:
import { AbilityBuilder, Ability } from '@casl/ability';
const { can, cannot, build } = new AbilityBuilder(Ability);
can('read', 'Post');
cannot('delete', 'Post', { published: true });
const ability = build();
p-queue example:
import PQueue from 'p-queue';
const queue = new PQueue({ concurrency: 2 });
await queue.add(() => fetchSomething());
await queue.add(() => fetchSomethingElse());
While both libraries serve different purposes, CASL provides a robust solution for managing permissions and access control, whereas p-queue excels at managing asynchronous operations and controlling concurrency. The choice between them depends on the specific needs of your project, with CASL being more suitable for complex authorization scenarios and p-queue for optimizing asynchronous task execution.
Premium Queue package for handling distributed jobs and messages in NodeJS.
Pros of Bull
- Robust job queue system with Redis backend, offering persistence and scalability
- Supports advanced features like job prioritization, retries, and rate limiting
- Provides a dashboard for monitoring and managing jobs
Cons of Bull
- More complex setup and configuration compared to p-queue
- Requires Redis as a dependency, which may not be suitable for all use cases
- Steeper learning curve due to its extensive feature set
Code Comparison
p-queue:
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
await queue.add(() => fetchSomething());
await queue.add(() => fetchSomethingElse());
Bull:
import Queue from 'bull';
const queue = new Queue('myQueue', 'redis://127.0.0.1:6379');
queue.add({data: 'someData'});
queue.process(async (job) => {
// Process job
});
Summary
p-queue is a lightweight, in-memory queue solution ideal for simple concurrency control in Node.js applications. It's easy to set up and use but lacks advanced features and persistence.
Bull, on the other hand, is a full-featured job queue system built on Redis. It offers robust job management, persistence, and scalability, making it suitable for more complex distributed systems. However, it comes with a steeper learning curve and additional infrastructure requirements.
Choose p-queue for simple, in-memory queue needs, and Bull for more advanced, distributed job processing requirements.
A simple, fast, robust job/task queue for Node.js, backed by Redis.
Pros of bee-queue
- Distributed job queue system using Redis, allowing for scalability across multiple processes and machines
- Supports job prioritization and delayed jobs out of the box
- Provides built-in job progress tracking and event handling
Cons of bee-queue
- Requires Redis as a dependency, adding complexity to the setup
- Limited to Node.js environments, unlike p-queue which is more versatile
- May have higher latency due to Redis communication overhead
Code Comparison
bee-queue:
const Queue = require('bee-queue');
const queue = new Queue('example');
queue.process(async (job) => {
// Process job
});
p-queue:
const PQueue = require('p-queue');
const queue = new PQueue({concurrency: 1});
await queue.add(async () => {
// Process task
});
Key Differences
- bee-queue is designed for distributed systems, while p-queue is an in-memory queue for a single process
- bee-queue offers more advanced features like job persistence and retries, whereas p-queue focuses on simplicity and concurrency control
- p-queue is more lightweight and easier to integrate into existing JavaScript/TypeScript projects
- bee-queue is better suited for long-running, complex job processing scenarios, while p-queue excels in managing asynchronous tasks within a single application
BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
Pros of BullMQ
- Robust job queue system with Redis backend, offering persistence and scalability
- Supports advanced features like job prioritization, retries, and rate limiting
- Provides a dashboard for monitoring and managing jobs
Cons of BullMQ
- More complex setup and configuration compared to p-queue
- Requires Redis as a dependency, which may not be suitable for all use cases
- Steeper learning curve due to its extensive feature set
Code Comparison
p-queue:
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
await queue.add(() => fetchSomething());
await queue.add(() => fetchSomethingElse());
BullMQ:
import { Queue, Worker } from 'bullmq';
const myQueue = new Queue('myQueue');
await myQueue.add('myJob', { foo: 'bar' });
const worker = new Worker('myQueue', async job => {
// Process job
});
Summary
p-queue is a lightweight, in-memory queue solution ideal for simple concurrency control in Node.js applications. BullMQ, on the other hand, is a more feature-rich job queue system built on Redis, suitable for complex, distributed job processing scenarios. While p-queue is easier to set up and use, BullMQ offers more advanced features and scalability at the cost of increased complexity.
High performance Node.js/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Pros of Worker
- Built specifically for PostgreSQL, offering robust database integration
- Supports job persistence and distributed processing across multiple servers
- Provides advanced features like job scheduling and retries
Cons of Worker
- More complex setup and configuration compared to p-queue
- Limited to PostgreSQL environments, less versatile for other use cases
- Steeper learning curve due to its more comprehensive feature set
Code Comparison
Worker:
import { run, quickAddJob } from "graphile-worker";
await quickAddJob("send-email", { to: "user@example.com" });
await run({
connectionString: "postgres://user:pass@host:5432/dbname",
});
p-queue:
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
await queue.add(() => sendEmail('user@example.com'));
Worker is designed for robust, scalable job processing with PostgreSQL, while p-queue offers a simpler, in-memory queue solution for general-purpose task management. Worker excels in distributed environments with persistent job storage, whereas p-queue is more suitable for lightweight, single-process applications requiring minimal setup.
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual CopilotREADME
p-queue
Promise queue with concurrency control
Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
For servers, you probably want a Redis-backed job queue instead.
Note that the project is feature complete. We are happy to review pull requests, but we don't plan any further development. We are also not answering email support questions.
Install
npm install p-queue
Warning: This package is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you'll have to convert to ESM. Please don't open issues for questions regarding CommonJS / ESM.
Usage
Here we run only one promise at the time. For example, set concurrency
to 4 to run four promises at the same time.
import PQueue from 'p-queue';
import got from 'got';
const queue = new PQueue({concurrency: 1});
(async () => {
await queue.add(() => got('https://sindresorhus.com'));
console.log('Done: sindresorhus.com');
})();
(async () => {
await queue.add(() => got('https://avajs.dev'));
console.log('Done: avajs.dev');
})();
(async () => {
const task = await getUnicornTask();
await queue.add(task);
console.log('Done: Unicorn task');
})();
API
PQueue(options?)
Returns a new queue
instance, which is an EventEmitter3
subclass.
options
Type: object
concurrency
Type: number
Default: Infinity
Minimum: 1
Concurrency limit.
timeout
Type: number
Per-operation timeout in milliseconds. Operations fulfill once timeout
elapses if they haven't already.
throwOnTimeout
Type: boolean
Default: false
Whether or not a timeout is considered an exception.
autoStart
Type: boolean
Default: true
Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
queueClass
Type: Function
Class with a enqueue
and dequeue
method, and a size
getter. See the Custom QueueClass section.
intervalCap
Type: number
Default: Infinity
Minimum: 1
The max number of runs in the given interval of time.
interval
Type: number
Default: 0
Minimum: 0
The length of time in milliseconds before the interval count resets. Must be finite.
carryoverConcurrencyCount
Type: boolean
Default: false
If true
, specifies that any pending Promises, should be carried over into the next interval and counted against the intervalCap
. If false
, any of those pending Promises will not count towards the next intervalCap
.
queue
PQueue
instance.
.add(fn, options?)
Adds a sync or async task to the queue. Always returns a promise.
Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.
fn
Type: Function
Promise-returning/async function. When executed, it will receive {signal}
as the first argument.
options
Type: object
priority
Type: number
Default: 0
Priority of operation. Operations with greater priority will be scheduled first.
signal
AbortSignal
for cancellation of the operation. When aborted, it will be removed from the queue and the queue.add()
call will reject with an error. If the operation is already running, the signal will need to be handled by the operation itself.
import PQueue from 'p-queue';
import got, {CancelError} from 'got';
const queue = new PQueue();
const controller = new AbortController();
try {
await queue.add(({signal}) => {
const request = got('https://sindresorhus.com');
signal.addEventListener('abort', () => {
request.cancel();
});
try {
return await request;
} catch (error) {
if (!(error instanceof CancelError)) {
throw error;
}
}
}, {signal: controller.signal});
} catch (error) {
if (!(error instanceof DOMException)) {
throw error;
}
}
.addAll(fns, options?)
Same as .add()
, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.
.pause()
Put queue execution on hold.
.start()
Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false
or by .pause()
method.)
Returns this
(the instance).
.onEmpty()
Returns a promise that settles when the queue becomes empty.
Can be called multiple times. Useful if you for example add additional items at a later time.
.onIdle()
Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0
.
The difference with .onEmpty
is that .onIdle
guarantees that all work from the queue has finished. .onEmpty
merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
.onSizeLessThan(limit)
Returns a promise that settles when the queue size is less than the given limit: queue.size < limit
.
If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan()
before adding a new item.
Note that this only limits the number of items waiting to start. There could still be up to concurrency
jobs already running that this call does not include in its calculation.
.clear()
Clear the queue.
.size
Size of the queue, the number of queued items waiting to run.
.sizeBy(options)
Size of the queue, filtered by the given options.
For example, this can be used to find the number of items remaining in the queue with a specific priority level.
import PQueue from 'p-queue';
const queue = new PQueue();
queue.add(async () => 'ð¦', {priority: 1});
queue.add(async () => 'ð¦', {priority: 0});
queue.add(async () => 'ð¦', {priority: 1});
console.log(queue.sizeBy({priority: 1}));
//=> 2
console.log(queue.sizeBy({priority: 0}));
//=> 1
.pending
Number of running items (no longer in the queue).
.timeout
.concurrency
.isPaused
Whether the queue is currently paused.
Events
active
Emitted as each item is processed in the queue for the purpose of tracking progress.
import delay from 'delay';
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
let count = 0;
queue.on('active', () => {
console.log(`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`);
});
queue.add(() => Promise.resolve());
queue.add(() => delay(2000));
queue.add(() => Promise.resolve());
queue.add(() => Promise.resolve());
queue.add(() => delay(500));
completed
Emitted when an item completes without error.
import delay from 'delay';
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
queue.on('completed', result => {
console.log(result);
});
queue.add(() => Promise.resolve('hello, world!'));
error
Emitted if an item throws an error.
import delay from 'delay';
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
queue.on('error', error => {
console.error(error);
});
queue.add(() => Promise.reject(new Error('error')));
empty
Emitted every time the queue becomes empty.
Useful if you for example add additional items at a later time.
idle
Emitted every time the queue becomes empty and all promises have completed; queue.size === 0 && queue.pending === 0
.
The difference with empty
is that idle
guarantees that all work from the queue has finished. empty
merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
import delay from 'delay';
import PQueue from 'p-queue';
const queue = new PQueue();
queue.on('idle', () => {
console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`);
});
const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));
await job1;
await job2;
// => 'Queue is idle. Size: 0 Pending: 0'
await queue.add(() => delay(600));
// => 'Queue is idle. Size: 0 Pending: 0'
The idle
event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle()
function returns resolves once the queue becomes idle instead of every time the queue is idle.
add
Emitted every time the add method is called and the number of pending or queued tasks is increased.
next
Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.
import delay from 'delay';
import PQueue from 'p-queue';
const queue = new PQueue();
queue.on('add', () => {
console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`);
});
queue.on('next', () => {
console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`);
});
const job1 = queue.add(() => delay(2000));
const job2 = queue.add(() => delay(500));
await job1;
await job2;
//=> 'Task is added. Size: 0 Pending: 1'
//=> 'Task is added. Size: 0 Pending: 2'
await queue.add(() => delay(600));
//=> 'Task is completed. Size: 0 Pending: 1'
//=> 'Task is completed. Size: 0 Pending: 0'
Advanced example
A more advanced example to help you understand the flow.
import delay from 'delay';
import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 1});
(async () => {
await delay(200);
console.log(`8. Pending promises: ${queue.pending}`);
//=> '8. Pending promises: 0'
(async () => {
await queue.add(async () => 'ð');
console.log('11. Resolved')
})();
console.log('9. Added ð');
console.log(`10. Pending promises: ${queue.pending}`);
//=> '10. Pending promises: 1'
await queue.onIdle();
console.log('12. All work is done');
})();
(async () => {
await queue.add(async () => 'ð¦');
console.log('5. Resolved')
})();
console.log('1. Added ð¦');
(async () => {
await queue.add(async () => 'ð´');
console.log('6. Resolved')
})();
console.log('2. Added ð´');
(async () => {
await queue.onEmpty();
console.log('7. Queue is empty');
})();
console.log(`3. Queue size: ${queue.size}`);
//=> '3. Queue size: 1`
console.log(`4. Pending promises: ${queue.pending}`);
//=> '4. Pending promises: 1'
$ node example.js
1. Added ð¦
2. Added ð´
3. Queue size: 1
4. Pending promises: 1
5. Resolved ð¦
6. Resolved ð´
7. Queue is empty
8. Pending promises: 0
9. Added ð
10. Pending promises: 1
11. Resolved ð
12. All work is done
Custom QueueClass
For implementing more complex scheduling policies, you can provide a QueueClass in the options:
import PQueue from 'p-queue';
class QueueClass {
constructor() {
this._queue = [];
}
enqueue(run, options) {
this._queue.push(run);
}
dequeue() {
return this._queue.shift();
}
get size() {
return this._queue.length;
}
filter(options) {
return this._queue;
}
}
const queue = new PQueue({queueClass: QueueClass});
p-queue
will call corresponding methods to put and get operations from this queue.
FAQ
How do the concurrency
and intervalCap
options affect each other?
They are just different constraints. The concurrency
option limits how many things run at the same time. The intervalCap
option limits how many things run in total during the interval (over time).
Maintainers
Related
- p-limit - Run multiple promise-returning & async functions with limited concurrency
- p-throttle - Throttle promise-returning & async functions
- p-debounce - Debounce promise-returning & async functions
- p-all - Run promise-returning & async functions concurrently with optional limited concurrency
- Moreâ¦
Top Related Projects
CASL is an isomorphic authorization JavaScript library which restricts what resources a given user is allowed to access
Premium Queue package for handling distributed jobs and messages in NodeJS.
A simple, fast, robust job/task queue for Node.js, backed by Redis.
BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
High performance Node.js/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)
Convert designs to code with AI
Introducing Visual Copilot: A new AI model to turn Figma designs to high quality code using your components.
Try Visual Copilot