Piscina is a fast and efficient worker thread pool implementation for Node.js, designed for offloading CPU-intensive tasks from the main event loop.
Repository: https://github.com/piscinajs/piscina Website/Docs: https://piscinajs.dev/
Key Features:
- Fast inter-thread communication (using Atomics by default).
- Handles both fixed-task and variable-task workloads.
- Flexible pool size management (
minThreads,maxThreads). - Async tracking integration.
- Performance statistics (
runTime,waitTime). - Task cancellation support (
AbortController,EventEmitter). - Resource limits enforcement (
maxOldGenerationSizeMb, etc.). - Supports CommonJS, ESM, and TypeScript.
- Custom task queues.
- Optional CPU scheduling priorities (Linux).
Requirements: Node.js 18.x and higher.
npm install piscina
# or
yarn add piscinamain.js: Initializes Piscina, pointing to the worker file.worker.js: Contains the code the worker threads will execute.
// main.js
const path = require('node:path');
const Piscina = require('piscina');
const piscina = new Piscina({
// Absolute path or file:// URL to the worker script
filename: path.resolve(__dirname, 'worker.js')
});
(async () => {
try {
const result = await piscina.run({ a: 5, b: 10 });
console.log(`Result: ${result}`); // Output: Result: 15
const asyncResult = await piscina.run({ a: 2, b: 3, delayMs: 100 });
console.log(`Async Result: ${asyncResult}`); // Output: Async Result: 5 (after ~100ms)
} catch (error) {
console.error('Task failed:', error);
} finally {
// Optional: Gracefully shutdown the pool when done
// await piscina.close();
}
})();// worker.js - Synchronous Task
// module.exports = (taskData) => {
// console.log(`Worker processing: ${JSON.stringify(taskData)}`);
// if (typeof taskData.a !== 'number' || typeof taskData.b !== 'number') {
// throw new Error('Invalid input: a and b must be numbers');
// }
// return taskData.a + taskData.b;
// };
// worker.js - Asynchronous Task (using Promise)
const { setTimeout: sleep } = require('node:timers/promises');
module.exports = async (taskData) => {
console.log(`Worker processing async: ${JSON.stringify(taskData)}`);
if (typeof taskData.a !== 'number' || typeof taskData.b !== 'number') {
throw new Error('Invalid input: a and b must be numbers');
}
if (taskData.delayMs) {
await sleep(taskData.delayMs);
}
return taskData.a + taskData.b;
};main.mjs: Usesimportandfile://URLs.worker.mjs: Usesexport default.
// main.mjs
import { Piscina } from 'piscina';
import { fileURLToPath } from 'node:url';
import path from 'node:path';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const piscina = new Piscina({
// Must be a file:// URL for ESM workers
filename: new URL('./worker.mjs', import.meta.url).href
});
(async () => {
try {
const result = await piscina.run({ x: 7, y: 3 });
console.log(`ESM Result: ${result}`); // Output: ESM Result: 10
} catch (error) {
console.error('ESM Task failed:', error);
}
})();// worker.mjs
import { setTimeout as sleep } from 'node:timers/promises';
// export default ({ x, y }) => {
// console.log(`ESM Worker processing: ${JSON.stringify({ x, y })}`);
// return x + y;
// };
// OR async
export default async ({ x, y, delayMs = 50 }) => {
console.log(`ESM Worker processing async: ${JSON.stringify({ x, y })}`);
await sleep(delayMs);
return x + y;
};const piscina = new Piscina({
// --- Core ---
filename: path.resolve(__dirname, 'worker.js'), // Path to worker script (required if not passed in run())
name: 'default', // Name of the exported function to run (default: 'default')
// --- Pool Size ---
minThreads: Math.max(1, os.availableParallelism() / 2), // Default: os.availableParallelism()
maxThreads: os.availableParallelism() * 1.5, // Default: os.availableParallelism() * 1.5
idleTimeout: 0, // ms. Time before an idle thread is shut down (0 = immediate, Infinity = never). Default: 0
// Consider increasing for better performance if tasks are frequent.
// --- Queueing ---
maxQueue: 'auto', // Max tasks waiting for a thread. Default: Infinity. 'auto' = maxThreads^2
concurrentTasksPerWorker: 1, // Tasks per worker thread. Default: 1. Increase for I/O-bound async tasks in workers.
taskQueue: new Piscina.FixedQueue(), // Optional: Custom task queue implementation. Default: Array-based FIFO. FixedQueue is faster.
// --- Performance & Communication ---
atomics: 'sync', // 'sync' (default), 'async', 'disabled'. Controls Atomics usage for communication. 'async' allows event loop activity between tasks but has overhead.
recordTiming: true, // Collect runTime and waitTime stats. Default: true.
// --- Worker Environment ---
resourceLimits: { // See Node.js Worker docs
maxOldGenerationSizeMb: 1024, // Max main heap size (MB) per worker
// maxYoungGenerationSizeMb: ...,
// codeRangeSizeMb: ...,
stackSizeMb: 8, // Default: 4
},
env: { // Set environment variables inside workers
...process.env, // Inherit main process env
WORKER_SPECIFIC_VAR: 'value'
},
argv: ['--worker-arg1'], // Append to process.argv in workers
execArgv: ['--inspect=0'], // Node.js CLI options for workers (e.g., debugging)
workerData: { sharedConfig: 'some value' }, // Data available via require('piscina').workerData or import { workerData } from 'piscina'
// --- Scheduling & Cleanup ---
niceIncrement: 0, // Linux only (requires @napi-rs/nice): Increase value to lower worker priority. Default: 0
trackUnmanagedFds: true, // Auto-close fs.open/fs.close FDs on worker exit. Default: true (Node >= 12.19/14.6)
closeTimeout: 30000, // ms. Max time to wait for tasks during piscina.close(). Default: 30000
});Returns a Promise that resolves with the worker function's return value or rejects on error/cancellation.
// Basic run
const result = await piscina.run({ data: 'some task data' });
// Override filename for this task
const resultOtherWorker = await piscina.run(
{ data: 'specific task' },
{ filename: path.resolve(__dirname, 'other_worker.js') }
);
// Specify exported function name (see 3.3)
const multiplyResult = await piscina.run(
{ a: 5, b: 3 },
{ name: 'multiply' }
);
// Transfer data instead of cloning (for ArrayBuffer, TypedArray, MessagePort etc.)
const buffer = new ArrayBuffer(1024);
// Fill buffer...
const resultWithTransfer = await piscina.run(
buffer, // Task data is the buffer itself
{ transferList: [buffer] } // Mark buffer for transfer
);
// Note: `buffer` in the main thread is now unusable after transfer.
// Cancelable Task (see 3.5)
const ac = new AbortController();
const taskPromise = piscina.run({ /* ... */ }, { signal: ac.signal });
// Later...
// ac.abort();
try {
await taskPromise;
} catch (err) {
if (err.name === 'AbortError' || err.code === 'ERR_WORKER_ABORTED') {
console.log('Task was cancelled');
} else {
console.error('Task failed:', err);
}
}Define multiple named functions in your worker file and choose which one to run using the name option.
// worker_math.js
function add({ a, b }) { return a + b; }
function subtract({ a, b }) { return a - b; }
function multiply({ a, b }) { return a * b; }
// Method 1: Assign properties to the default export
// add.add = add; // Optional: make 'add' runnable via name: 'add'
// add.subtract = subtract;
// add.multiply = multiply;
// module.exports = add; // 'add' is the default export
// Method 2: Export an object (Recommended for clarity)
module.exports = {
add, // Runnable via name: 'add' (or 'default' if no default export exists explicitly)
subtract, // Runnable via name: 'subtract'
multiply, // Runnable via name: 'multiply'
// You can still set a default if needed:
default: add // Runnable via name: 'default' (or if no name option is provided)
};// main.js (using worker_math.js)
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker_math.js'),
// 'name' option in constructor sets the default for piscina.run()
// name: 'add' // Optional: makes 'add' the default if not specified in run()
});
(async () => {
const sum = await piscina.run({ a: 10, b: 5 }); // Runs default ('add' if set above, or the module's default export)
// OR explicitly specify name:
// const sum = await piscina.run({ a: 10, b: 5 }, { name: 'add' });
const difference = await piscina.run({ a: 10, b: 5 }, { name: 'subtract' });
const product = await piscina.run({ a: 10, b: 5 }, { name: 'multiply' });
console.log({ sum, difference, product });
// Output: { sum: 15, difference: 5, product: 50 }
})();Use an AbortController or any EventEmitter that emits an 'abort' event.
// Using AbortController (Recommended)
const ac = new AbortController();
const { signal } = ac;
const task = piscina.run({ /* ... */ }, { signal });
// To cancel:
setTimeout(() => {
console.log('Aborting task...');
ac.abort();
}, 50); // Example: Abort after 50ms
try {
await task;
} catch (error) {
// Piscina throws specific errors on abort
if (error.name === 'AbortError' || error.code === 'ERR_WORKER_ABORTED') {
console.log('Task successfully aborted by AbortController.');
} else {
console.error('Task failed for other reason:', error);
}
}
// Using EventEmitter
const EventEmitter = require('node:events');
const eeSignal = new EventEmitter();
const task2 = piscina.run({ /* ... */ }, { signal: eeSignal });
// To cancel:
setTimeout(() => {
console.log('Aborting task via EventEmitter...');
eeSignal.emit('abort');
}, 50);
try {
await task2;
} catch (error) {
if (error.name === 'AbortError' || error.code === 'ERR_WORKER_ABORTED') {
console.log('Task successfully aborted by EventEmitter.');
} else {
console.error('Task 2 failed for other reason:', error);
}
}- Note: Aborted tasks reject the promise returned by
piscina.run(). If the task was already running, the worker thread executing it will be terminated and replaced.
Use Piscina.move() in the worker to mark transferable objects (like ArrayBuffer) to be transferred back to the main thread instead of cloned, improving performance for large data.
// worker_transfer.js
const { move } = require('piscina');
// or import { move } from 'piscina'; // for ESM
module.exports = (taskData) => {
const dataSize = taskData.size || 10 * 1024 * 1024; // 10MB
const buffer = new ArrayBuffer(dataSize);
const view = new Uint8Array(buffer);
// Fill the buffer with some data...
for (let i = 0; i < dataSize; i++) {
view[i] = i % 256;
}
console.log('Worker created buffer, moving it back.');
// Wrap the transferable object with move()
return move(buffer);
};// main.js (using worker_transfer.js)
const piscina = new Piscina({ filename: path.resolve(__dirname, 'worker_transfer.js') });
(async () => {
console.time('transfer');
const transferredBuffer = await piscina.run({ size: 50 * 1024 * 1024 }); // 50MB
console.timeEnd('transfer');
console.log(`Received buffer of size: ${transferredBuffer.byteLength}`);
// The buffer is now owned by the main thread.
})();TransferableInterface: For complex objects containing transferable parts, implement a customTransferableusingPiscina.transferableSymbolandPiscina.valueSymbol. See the official README/docs for details.
Export a Promise from the worker file that resolves to the task function. Piscina waits for this promise before marking the worker as ready.
// worker_async_init.js
const { setTimeout: sleep } = require('node:timers/promises');
async function initializeWorker() {
console.log('Worker initializing...');
// Simulate async setup (e.g., DB connection, loading data)
await sleep(500);
console.log('Worker ready!');
// Return the actual task processing function
return (taskData) => {
console.log(`Processing task: ${taskData.value}`);
return `Processed: ${taskData.value}`;
};
}
// Export the promise returned by the async function call
module.exports = initializeWorker();// main.js (using worker_async_init.js)
const piscina = new Piscina({ filename: path.resolve(__dirname, 'worker_async_init.js') });
// Workers will take ~500ms to become available
(async () => {
console.log('Submitting task...');
const result = await piscina.run({ value: 'Test Data' });
console.log(result); // Output: Processed: Test Data (after init completes)
})();Control the flow of tasks when the pool is busy.
// main_backpressure.js
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js'),
maxThreads: 2,
maxQueue: 4 // Limit queue size (or use 'auto')
// idleTimeout: 1000 // Prevent threads stopping immediately
});
let taskCounter = 0;
let isPaused = false;
const MAX_TASKS = 20;
function submitTask() {
if (taskCounter >= MAX_TASKS) {
console.log('All tasks submitted.');
// Consider closing the pool when truly done
// piscina.close().then(() => console.log('Pool closed.'));
return;
}
// Check if pool is overloaded BEFORE submitting
if (piscina.queueSize >= piscina.options.maxQueue) {
if (!isPaused) {
console.log(`Queue full (${piscina.queueSize}/${piscina.options.maxQueue}). Pausing submission...`);
isPaused = true;
// Stop adding tasks (e.g., pause reading from a stream)
}
// Optionally wait for drain event or check periodically
return;
}
// If paused and queue has space, resume
if (isPaused) {
console.log(`Queue has space (${piscina.queueSize}/${piscina.options.maxQueue}). Resuming submission...`);
isPaused = false;
// Resume adding tasks (e.g., resume reading from a stream)
}
const currentTask = taskCounter++;
console.log(`Submitting task ${currentTask}`);
piscina.run({ a: currentTask, b: 1, delayMs: 100 }) // Simulate work
.then(result => console.log(`Task ${currentTask} completed: ${result}`))
.catch(err => console.error(`Task ${currentTask} failed:`, err));
// Immediately try to submit the next task (or use setInterval/setTimeout)
setImmediate(submitTask);
}
// Listen for 'drain' event - emitted when queue is empty *and* tasks running equals pool capacity or less
// Useful for knowing when to resume a paused producer
piscina.on('drain', () => {
console.log('*** Pool drained (queue empty) ***');
if (isPaused) {
console.log('Resuming submission due to drain event.');
isPaused = false;
// Resume adding tasks
setImmediate(submitTask); // Kick off submission again
}
});
// Listen for 'needsDrain' event - emitted when queue size > 0 OR active tasks >= maxThreads
// Can also be used for pausing
// piscina.on('needsDrain', () => {
// if (!isPaused) {
// console.log('*** Pool needs drain *** -- Pausing');
// isPaused = true;
// }
// });
// Start the submission process
submitTask();Communicate between the main thread and all worker threads (Node.js v18+).
// main_broadcast.js
const { BroadcastChannel } = require('node:worker_threads');
const Piscina = require('piscina');
const path = require('node:path');
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker_broadcast.js'),
minThreads: 2,
maxThreads: 2,
// BroadcastChannel might not work reliably with atomics: 'sync' if threads pause immediately.
// Consider 'async' or disabling atomics if issues arise, though this example works often.
// atomics: 'async'
});
const channelName = 'my-app-config-updates';
async function main() {
const bc = new BroadcastChannel(channelName);
console.log('Starting workers...');
// Run tasks to ensure workers are started and listening
await Promise.all([
piscina.run({ id: 1 }),
piscina.run({ id: 2 })
]);
console.log('Workers started. Broadcasting message in 1 second...');
setTimeout(() => {
const message = { type: 'config_update', data: { theme: 'dark' } };
console.log('Main: Broadcasting:', message);
bc.postMessage(message);
}, 1000);
setTimeout(() => {
console.log('Broadcasting another message...');
bc.postMessage({ type: 'shutdown_signal' });
bc.close(); // Close the channel when done
piscina.close(); // Close the pool
}, 3000);
}
main();// worker_broadcast.js
const { BroadcastChannel, isMainThread } = require('node:worker_threads');
const { workerData } = require('piscina'); // Access initial workerData if needed
if (isMainThread) {
throw new Error('This script should only run in a worker thread');
}
const channelName = 'my-app-config-updates';
let bc; // Define bc in the worker's scope
// Main task function
module.exports = async (taskData) => {
console.log(`Worker [${taskData.id}]: Received task.`);
// Initialize BroadcastChannel only once per worker instance
if (!bc) {
console.log(`Worker [${taskData.id}]: Setting up BroadcastChannel listener...`);
bc = new BroadcastChannel(channelName);
bc.onmessage = (event) => {
console.log(`Worker [${taskData.id}]: Received broadcast message:`, event.data);
if (event.data?.type === 'shutdown_signal') {
console.log(`Worker [${taskData.id}]: Received shutdown. Closing channel.`);
bc.close();
}
// Handle the message (e.g., update worker's internal state)
};
// Optional: Handle errors on the channel
bc.onmessageerror = (error) => {
console.error(`Worker [${taskData.id}]: BroadcastChannel message error:`, error);
};
}
// Simulate some work for the task itself
await new Promise(resolve => setTimeout(resolve, 500));
console.log(`Worker [${taskData.id}]: Task finished.`);
return `Worker ${taskData.id} done.`;
};
// Optional: Cleanup when the worker exits (e.g., due to idleTimeout or destroy)
// process.on('exit', () => {
// if (bc) {
// console.log('Worker exiting, closing BroadcastChannel');
// bc.close();
// }
// });Replace the default FIFO queue for potentially better performance, especially with large queues.
const { Piscina, FixedQueue } = require('piscina'); // Import FixedQueue
const path = require('node:path');
const piscina = new Piscina({
filename: path.resolve(__dirname, 'worker.js'),
// Use the built-in high-performance queue
taskQueue: new FixedQueue()
});
// ... rest of your code using piscina.run() ...- You can also implement your own queue by providing an object matching the
TaskQueueinterface (see README).
piscina.run(task, [options]): Submits a task. ReturnsPromise<Result>. (See 3.2)piscina.close([options]): Gracefully shuts down the pool. Waits for running tasks to complete (up tocloseTimeout). ReturnsPromise<void>.options.force(boolean, defaultfalse): Iftrue, cancels queued (not yet running) tasks immediately.
await piscina.close(); // Wait for running tasks await piscina.close({ force: true }); // Abort queued tasks
piscina.destroy(): Immediately terminates all workers and rejects all pending/running tasks. ReturnsPromise<void>. Use for forceful shutdown.await piscina.destroy();
Listen using piscina.on('eventName', handler).
'error' (error): Emitted for unhandled errors within the pool/worker management itself (e.g., worker crashes unexpectedly, invalid messages). Task-specific errors reject therun()promise.'drain': Emitted when the queue becomes empty and worker utilization allows for more tasks. Useful for resuming paused producers. (See 3.7)'needsDrain': Emitted whenpiscina.needsDrainbecomes true (pool is at or over capacity). Useful for pausing producers.'message' (message): Emitted when a worker sends a message usingparentPort.postMessage()(less common with Piscina's task model, but possible).
piscina.options: Copy of the options used to configure the pool.piscina.threads: Array of the activeWorkerinstances.piscina.queueSize: Number of tasks currently waiting in the queue.piscina.completed: Total number of tasks completed since the pool was created.piscina.duration: Time in milliseconds since the pool was created.piscina.needsDrain: Boolean indicating if the pool is operating at or beyond capacity (queueSize > 0or active tasks >=maxThreads). (See 3.7)piscina.utilization: A ratio (0-1) indicating how busy the pool has been (approx. total task run time / total pool capacity time).piscina.runTime: Histogram object summarizing task execution times (ms). Includes.average,.mean,.stddev,.min,.max, and percentiles (.p50,.p99, etc.). RequiresrecordTiming: true.piscina.waitTime: Histogram object summarizing task wait times in the queue (ms). Same structure asrunTime. RequiresrecordTiming: true.
console.log(`Queue size: ${piscina.queueSize}`);
console.log(`Pool needs drain? ${piscina.needsDrain}`);
console.log(`Average run time: ${piscina.runTime.average} ms`);
console.log(`99th percentile wait time: ${piscina.waitTime.p99} ms`);Piscina.isWorkerThread: Boolean,trueif the current code is running inside a Piscina worker thread.Piscina.version: String, the version of the Piscina library.Piscina.move(value): Marks a transferable value for transfer instead of cloning when returned from a worker. (See 3.5)Piscina.transferableSymbol,Piscina.valueSymbol: Symbols used for implementing the customTransferableinterface.Piscina.queueOptionsSymbol: Symbol used for passing options to custom task queues via the task object.Piscina.FixedQueue: The built-in high-performance task queue class. (See 3.9)
// worker.js (CJS)
const { isWorkerThread, workerData, move, parentPort /* etc */ } = require('piscina');
// or const piscina = require('piscina'); piscina.isWorkerThread ...
if (isWorkerThread) {
console.log('Running inside a Piscina worker.');
console.log('Initial workerData:', workerData); // Access data passed in constructor
}
module.exports = (task) => { /* ... */ };
// worker.mjs (ESM)
import { isWorkerThread, workerData, move, parentPort /* etc */ } from 'piscina';
if (isWorkerThread) {
console.log('Running inside a Piscina worker (ESM).');
console.log('Initial workerData:', workerData);
}
export default (task) => { /* ... */ };- Use Case: Best for CPU-bound synchronous tasks (complex calculations, data processing) to avoid blocking the main Node.js event loop. Asynchronous tasks (like I/O) already benefit from Node.js's internal thread pool; moving them to Piscina workers might offer limited gains unless they also involve significant CPU work.
- Worker Initialization: Costly operations (DB connections, loading large data) should ideally happen once per worker, potentially using async initialization (See 3.6).
- Data Transfer: Avoid cloning large data. Use
transferListinpiscina.run()options orPiscina.move()when returning from workers forArrayBuffers etc. (See 3.5). Pass only necessary data to tasks. - Pool Sizing (
minThreads,maxThreads): Tune based on available CPU cores and workload. Too few threads underutilize CPU; too many cause excessive context switching. Start with defaults and benchmark. - Idle Timeout (
idleTimeout): The default of0(immediate shutdown) can cause thread churn (creation/destruction overhead) if tasks arrive intermittently. IncreaseidleTimeout(e.g.,1000ms to60000ms) or raiseminThreadsif you expect frequent tasks, to keep workers warm. Balance this against resource usage of idle threads. - Queue Size (
maxQueue): Default (Infinity) can lead to high memory usage if tasks arrive much faster than they're processed. Setting'auto'(maxThreads^2) or a fixed number provides backpressure but risks rejecting tasks if the producer isn't designed to handle it. Usepiscina.needsDrain/'drain'event to manage flow (See 3.7). - Out-of-Scope Async Code: Ensure all asynchronous operations within a worker task handler are
awaited before returning. Unawaited async operations might pause and unexpectedly resume when the next task runs on that worker, leading to unpredictable behavior.// worker.js - BAD EXAMPLE const { setTimeout: sleep } = require('node:timers/promises'); module.exports = ({ a, b }) => { // This promise is NOT awaited sleep(1000).then(() => { console.log('This might run during a LATER task!'); }); return a + b; // Returns immediately };
- Resource Limits: Use
resourceLimitscautiously. Setting them too low can make workers unusable or cause crashes. Profile memory usage. - Linux Priority (
niceIncrement): If using on Linux and installing@napi-rs/nice, increasingniceIncrementlowers worker CPU priority. This can prevent workers from starving the main event loop but will slow down task completion. Profile carefully. - Multiple Pools: Be aware that multiple
Piscinainstances in the same application create separate thread pools that compete for CPU resources. Consider sharing a single pool if appropriate, or document clearly if your library uses Piscina internally.
- Task Rejections: Check the error passed to the
.catch()block of thepiscina.run()promise. It could be an error thrown by your worker code, a cancellation error (AbortError/ERR_WORKER_ABORTED), or an error from Piscina itself (e.g., worker exited unexpectedly). - Worker Crashes: Listen for the
'error'event on thePiscinainstance for errors not tied to a specific task promise (e.g., uncatchable exceptions during worker idle time, resource limit exceeded). Check workerstderrif possible (may require custom setup or observing thread exit codes). - High Memory Usage: Check
maxQueuesize. Ensure large data isn't being unnecessarily cloned (usemove/transferList). Check for memory leaks within your worker code. Monitor usingresourceLimitsand external tools. - Slow Performance / High CPU: Profile task execution time (
piscina.runTime). Check for thread churn (idleTimeout). Ensure you aren't creating too many threads (maxThreads). ConsiderniceIncrementon Linux. Ensure the tasks are genuinely CPU-bound. - Deadlocks/Hangs: Ensure worker tasks always complete or throw errors. Unresolved promises in workers can block threads. Check for issues with
atomics: 'sync'if workers rely on the event loop between tasks (consider'async'or'disabled'in that specific case).