Skip to content

Configuration & API

Task Options

When defining a task via reactiveTask,

typescript
await reactiveTask({
    task: 'process-order',
    collection: 'orders',
    // ...
});

you can use the following options:

OptionTypeDescription
taskstringRequired. Primary Unique Identifier.
⚠️ Do NOT change after deployment, otherwise all existing tasks will be re-executed as new tasks (Double Execution risk).
collectionstringRequired. Name of the MongoDB collection to watch.
handler(context) => Promise<void>Required. Async function to process the task. Use context.getDocument() to get the document.
filterDocumentStandard Query (e.g., { status: 'pending' }) OR Aggregation Expression (e.g., { $eq: ['$status', 'pending'] }). Aggregation syntax unlocks powerful features like using $$NOW for time-based filtering.
watchProjectionDocumentMongoDB Projection. Task only re-triggers if the projected result changes. Supports inclusion { a: 1 } and computed fields.
debouncenumber | stringDebounce window (ms or duration string). Default: 1000. Useful to group rapid updates.
retryPolicyRetryPolicyConfiguration for retries on failure. See Retry Policy.
cleanupPolicyCleanupPolicyConfiguration for automatic cleanup of orphaned task records. See Cleanup Policy.
executionHistoryLimitnumberNumber of past execution entries to keep in _tasks doc. Default: 5.
evolutionEvolutionConfigConfiguration for handling task logic updates (versioning, reconciliation policies). See Task Evolution.

Advanced Initialization

You can customize the scheduler behavior via mongodash.init:

typescript
await mongodash.init({
    uri: '...',
    
    // Instance ID: Unique identifier for this scheduler instance.
    // Used for leader election, metrics aggregation, and debugging.
    // If not provided, a random ObjectId hex string will be generated.
    instanceId: 'my-app-worker-1',
    
    // Concurrency: Number of parallel workers on the *current instance* (default: 5)
    // Total system concurrency = (reactiveTaskConcurrency * number of instances)
    reactiveTaskConcurrency: 10,
    
    // Globals Collection: Used for coordination and leadership (default: '_mongodash_globals')
    globalsCollection: 'my_custom_globals',
    
    // Filter: Run ONLY specific tasks on this instance (e.g. for scaling or time-based windows).
    // This function is called regularly (every poll cycle) for every pending task.
    // Example: Time-Based Filtering (e.g. only run 'nightly-job' during night hours)
    reactiveTaskFilter: ({ task }) => {
        if (task === 'nightly-job') {
             const hour = new Date().getHours();
             return hour >= 0 && hour < 6; // Only process between 00:00 - 06:00
        }
        return true; // Process all other tasks normally
    },
    
    // Caller: Wrap execution to add context, logging, or error handling.
    // Example: Generating a Correlation ID for distributed tracing
    reactiveTaskCaller: async (taskFn) => {
        const correlationId = crypto.randomUUID();
        return AsyncContext.run({ correlationId }, async () => {
             console.log(`[${correlationId}] Starting task...`);
             try {
                 await taskFn();
                 console.log(`[${correlationId}] Task finished.`);
             } catch (e) {
                 console.error(`[${correlationId}] Task failed:`, e);
                 throw e;
             }
        });
    },
    
    monitoring: {
        enabled: true,
        pushIntervalMs: 60000
    },
    
    // Cleanup Interval: How often to run periodic cleanup of orphaned tasks.
    // Accepts duration strings ('24h'), milliseconds, or cron expressions.
    // Default: '24h'
    reactiveTaskCleanupInterval: '24h'
});