Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,77 @@ await QueueManager.init({
})
```

## OpenTelemetry Instrumentation (experimental)

> [!WARNING]
> The OpenTelemetry instrumentation is experimental and its API may change in future releases.

`@boringnode/queue` ships with built-in OpenTelemetry instrumentation that creates **PRODUCER** spans for job dispatch and **CONSUMER** spans for job execution, following [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/).

### Quick Setup

```typescript
import { QueueInstrumentation } from '@boringnode/queue/otel'
import * as boringqueue from '@boringnode/queue'

const instrumentation = new QueueInstrumentation({
messagingSystem: 'boringqueue', // default
executionSpanLinkMode: 'link', // or 'parent'
})

instrumentation.enable()
instrumentation.manuallyRegister(boringqueue)
```

The instrumentation patches `QueueManager.init()` to automatically inject its wrappers — no config changes needed in your queue setup.

### Span Attributes

The instrumentation uses standard [OTel messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/) where they map cleanly, plus a few queue-specific custom attributes.

| Attribute | Kind | Description |
| ------------------------------- | ------- | ------------------------------------------ |
| `messaging.system` | Semconv | `'boringqueue'` (configurable) |
| `messaging.operation.name` | Semconv | `'publish'` or `'process'` |
| `messaging.destination.name` | Semconv | Queue name |
| `messaging.message.id` | Semconv | Job ID for single-message spans |
| `messaging.batch.message_count` | Semconv | Number of jobs in a batch dispatch |
| `messaging.message.retry.count` | Custom | Retry count (0-based) for a job attempt |
| `messaging.job.name` | Custom | Job class name (e.g. `SendEmailJob`) |
| `messaging.job.status` | Custom | `'completed'`, `'failed'`, or `'retrying'` |
| `messaging.job.group_id` | Custom | Queue-specific group identifier |
| `messaging.job.priority` | Custom | Queue-specific job priority |
| `messaging.job.delay_ms` | Custom | Delay before the job becomes available |

### Trace Context Propagation

The instrumentation automatically propagates trace context from dispatch to execution:

- **Link mode** (default): Each job execution is an independent trace, linked to the dispatch span
- **Parent mode**: Job execution is a child of the dispatch span (same trace)

Child spans created inside `execute()` (DB queries, HTTP calls, etc.) are automatically parented to the job consumer span.

### diagnostics_channel

Raw telemetry events are available via `diagnostics_channel` for custom subscribers:

```typescript
import { tracingChannels } from '@boringnode/queue'

const { executeChannel } = tracingChannels

executeChannel.subscribe({
start() {},
end() {},
asyncStart() {},
asyncEnd(message) {
console.log(`Job ${message.job.name} ${message.status} in ${message.duration}ms`)
},
error() {},
})
```

## Benchmarks

Performance comparison with BullMQ (5ms simulated work per job):
Expand Down
1 change: 1 addition & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export {
fixedBackoff,
} from './src/strategies/backoff_strategy.js'
export * as errors from './src/exceptions.js'
export * as tracingChannels from './src/tracing_channels.js'
18 changes: 18 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
],
"exports": {
".": "./build/index.js",
"./otel": "./build/src/otel.js",
"./drivers/*": "./build/src/drivers/*.js",
"./contracts/*": "./build/src/contracts/*.js",
"./types": "./build/src/types/index.js"
Expand Down Expand Up @@ -40,6 +41,11 @@
"@japa/expect-type": "^2.0.4",
"@japa/file-system": "^3.0.0",
"@japa/runner": "^5.3.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/context-async-hooks": "^2.6.0",
"@opentelemetry/core": "^2.6.0",
"@opentelemetry/instrumentation": "^0.213.0",
"@opentelemetry/sdk-trace-base": "^2.6.0",
"@poppinss/ts-exec": "^1.4.4",
"@types/better-sqlite3": "^7.6.13",
"@types/node": "^24.11.0",
Expand All @@ -59,10 +65,22 @@
"typescript": "^5.9.3"
},
"peerDependencies": {
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/core": "^1.30.0 || ^2.0.0",
"@opentelemetry/instrumentation": "^0.200.0",
"ioredis": "^5.0.0",
"knex": "^3.0.0"
},
"peerDependenciesMeta": {
"@opentelemetry/api": {
"optional": true
},
"@opentelemetry/core": {
"optional": true
},
"@opentelemetry/instrumentation": {
"optional": true
},
"ioredis": {
"optional": true
},
Expand Down
54 changes: 38 additions & 16 deletions src/drivers/sync_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { setTimeout as sleep } from 'node:timers/promises'
import { Locator } from '../locator.js'
import { QueueManager } from '../queue_manager.js'
import { JobExecutionRuntime } from '../job_runtime.js'
import { executeChannel } from '../tracing_channels.js'
import type { Adapter, AcquiredJob } from '../contracts/adapter.js'
import type {
JobContext,
Expand All @@ -11,6 +12,7 @@ import type {
ScheduleData,
ScheduleListOptions,
} from '../types/main.js'
import type { JobExecuteMessage } from '../types/tracing_channels.js'
import { DEFAULT_PRIORITY } from '../constants.js'

/**
Expand Down Expand Up @@ -165,40 +167,60 @@ export class SyncAdapter implements Adapter {
defaultTimeout: configResolver.getWorkerTimeout(),
})
const jobFactory = QueueManager.getJobFactory()
const executionWrapper = QueueManager.getExecutionWrapper()
let attempts = jobData.attempts

while (true) {
const now = Date.now()
const acquiredJob: AcquiredJob = { ...jobData, attempts, acquiredAt: now }

const context: JobContext = {
jobId: jobData.id,
name: jobData.name,
attempt: attempts + 1,
queue,
priority: jobData.priority ?? DEFAULT_PRIORITY,
acquiredAt: new Date(),
acquiredAt: new Date(now),
stalledCount: jobData.stalledCount ?? 0,
}

const jobInstance = jobFactory ? await jobFactory(JobClass) : new JobClass()

try {
await runtime.execute(jobInstance, jobData.payload, context)
return
} catch (error) {
const outcome = runtime.resolveFailure(error as Error, attempts)
const startTime = performance.now()
const executeMessage: JobExecuteMessage = { job: acquiredJob, queue }

const run = () => {
return executeChannel.tracePromise(async () => {
try {
await runtime.execute(jobInstance, jobData.payload, context)
executeMessage.status = 'completed'
} catch (error) {
const outcome = runtime.resolveFailure(error as Error, attempts)
executeMessage.error = error as Error

if (outcome.type === 'failed') {
executeMessage.status = 'failed'
await jobInstance.failed?.(outcome.hookError)
} else if (outcome.type === 'retry') {
executeMessage.status = 'retrying'
executeMessage.nextRetryAt = outcome.retryAt
}
}

if (outcome.type === 'failed') {
await jobInstance.failed?.(outcome.hookError)
return
}
executeMessage.duration = Number((performance.now() - startTime).toFixed(2))
}, executeMessage)
}

attempts++
await executionWrapper(run, acquiredJob, queue)

if (outcome.type === 'retry' && outcome.retryAt) {
const delay = outcome.retryAt.getTime() - Date.now()
if (executeMessage.status !== 'retrying') return

if (delay > 0) {
await sleep(delay)
}
attempts++

if (executeMessage.nextRetryAt) {
const delay = executeMessage.nextRetryAt.getTime() - Date.now()
if (delay > 0) {
await sleep(delay)
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/job_batch_dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import debug from './debug.js'
import { randomUUID } from 'node:crypto'
import { QueueManager } from './queue_manager.js'
import { dispatchChannel } from './tracing_channels.js'
import type { Adapter } from './contracts/adapter.js'
import type { DispatchManyResult } from './types/main.js'
import type { JobDispatchMessage } from './types/tracing_channels.js'

/**
* Fluent builder for dispatching multiple jobs to the queue in a single batch.
Expand Down Expand Up @@ -143,9 +145,12 @@ export class JobBatchDispatcher<T> {
* ```
*/
async run(): Promise<DispatchManyResult> {
if (this.#payloads.length === 0) return { jobIds: [] }

debug('dispatching %d jobs of type %s', this.#payloads.length, this.#name)

const adapter = this.#getAdapterInstance()
const wrapInternal = QueueManager.getInternalOperationWrapper()

const jobs = this.#payloads.map((payload) => ({
id: randomUUID(),
Expand All @@ -156,11 +161,14 @@ export class JobBatchDispatcher<T> {
groupId: this.#groupId,
}))

await adapter.pushManyOn(this.#queue, jobs)
const message: JobDispatchMessage = { jobs, queue: this.#queue }

return {
jobIds: jobs.map((job) => job.id),
}

await dispatchChannel.tracePromise(async () => {
await wrapInternal(() => adapter.pushManyOn(this.#queue, jobs))
}, message)

return { jobIds: jobs.map((job) => job.id) }
}

/**
Expand Down
24 changes: 14 additions & 10 deletions src/job_dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import debug from './debug.js'
import { randomUUID } from 'node:crypto'
import { QueueManager } from './queue_manager.js'
import { dispatchChannel } from './tracing_channels.js'
import type { Adapter } from './contracts/adapter.js'
import type { DispatchResult, Duration } from './types/main.js'
import type { JobDispatchMessage } from './types/tracing_channels.js'
import { parse } from './utils.js'

/**
Expand Down Expand Up @@ -184,8 +186,10 @@ export class JobDispatcher<T> {
debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload)

const adapter = this.#getAdapterInstance()
const wrapInternal = QueueManager.getInternalOperationWrapper()
const parsedDelay = this.#delay ? parse(this.#delay) : undefined

const payload = {
const jobData = {
id,
name: this.#name,
payload: this.#payload,
Expand All @@ -194,17 +198,17 @@ export class JobDispatcher<T> {
groupId: this.#groupId,
}

if (this.#delay) {
const parsedDelay = parse(this.#delay)
const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay }

await adapter.pushLaterOn(this.#queue, payload, parsedDelay)
} else {
await adapter.pushOn(this.#queue, payload)
}
await dispatchChannel.tracePromise(async () => {
if (parsedDelay !== undefined) {
await wrapInternal(() => adapter.pushLaterOn(this.#queue, jobData, parsedDelay))
} else {
await wrapInternal(() => adapter.pushOn(this.#queue, jobData))
}
}, message)

return {
jobId: id,
}
return { jobId: id }
}

/**
Expand Down
Loading
Loading