By the end of this tutorial you'll have a click pipeline: every redirect enqueues an event, a separate worker writes it to a clicks table idempotently, and GET /stats/:code serves aggregate counts. Failed jobs land in a DLQ.
clicks table with a unique message_id for idempotency.npm run worker starts it.GET /stats/:code returns total clicks + by-day buckets.npm run migrate -- create clicks
export const up = (pgm) => {
pgm.createTable('clicks', {
id: { type: 'bigserial', primaryKey: true },
message_id: { type: 'uuid', notNull: true, unique: true },
link_id: { type: 'bigint', notNull: true, references: 'links(id)', onDelete: 'CASCADE' },
occurred_at: { type: 'timestamptz', notNull: true },
referrer: { type: 'text' },
user_agent: { type: 'text' }
});
pgm.createIndex('clicks', ['link_id', 'occurred_at']);
};
export const down = (pgm) => pgm.dropTable('clicks');
npm run migrate:up
npm install bullmq
src/queue.ts:
import { Queue } from 'bullmq';
export const clickQueue = new Queue('clicks', {
connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' }
});
export type ClickEvent = {
id: string; // stable message id (uuid)
link_id: number;
occurred_at: string;
referrer: string | null;
user_agent: string | null;
};
export async function enqueueClick(ev: ClickEvent) {
await clickQueue.add('click', ev, {
jobId: ev.id,
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 1000,
removeOnFail: 5000
});
}
In src/app.ts:
import { randomUUID } from 'node:crypto';
import { enqueueClick } from './queue.js';
app.get('/:code', async (req, res) => {
// …existing cache + DB logic…
// After we've decided the target, before res.redirect:
const linkId = rows[0].id ?? cachedId; // adjust your shape
res.redirect(302, target);
// Fire-and-forget — don't await
enqueueClick({
id: randomUUID(),
link_id: Number(linkId),
occurred_at: new Date().toISOString(),
referrer: req.get('referer') ?? null,
user_agent: req.get('user-agent') ?? null
}).catch(e => console.warn('enqueue failed:', e.message));
});
id from the link query — update your SELECT to return it. Cache the ID along with the URL too (or look it up once on miss).src/worker.ts:
import 'dotenv/config';
import { Worker } from 'bullmq';
import { pool } from './db.js';
import type { ClickEvent } from './queue.js';
const worker = new Worker<ClickEvent>('clicks', async (job) => {
const ev = job.data;
// Idempotent insert — unique(message_id) absorbs duplicates
await pool.query(
`INSERT INTO clicks(message_id, link_id, occurred_at, referrer, user_agent)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (message_id) DO NOTHING`,
[ev.id, ev.link_id, ev.occurred_at, ev.referrer, ev.user_agent]
);
}, {
connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' },
concurrency: 10
});
worker.on('completed', (job) => console.log('done', job.id));
worker.on('failed', (job, err) => console.error('fail', job?.id, err.message));
npm pkg set scripts.worker="tsx src/worker.ts"
app.get('/stats/:code', requireAuth, async (req, res) => {
const { rows: link } = await pool.query(
'SELECT id, owner_id FROM links WHERE code = $1', [req.params.code]
);
if (link.length === 0) return res.sendStatus(404);
if (link[0].owner_id !== Number(req.user!.sub) && req.user!.role !== 'admin')
return res.sendStatus(403);
const { rows } = await pool.query(
`SELECT date_trunc('day', occurred_at)::date AS day, COUNT(*)::int AS clicks
FROM clicks WHERE link_id = $1
GROUP BY 1 ORDER BY 1 DESC LIMIT 30`,
[link[0].id]
);
const total = rows.reduce((s, r) => s + r.clicks, 0);
res.json({ total, by_day: rows });
});
Open three terminals:
# T1 npm run dev # T2 npm run worker # T3 — generate clicks for i in $(seq 1 50); do curl -s -o /dev/null http://localhost:3000/<your-code> done # T3 — check stats curl -b jar.txt http://localhost:3000/stats/<your-code>
# Stop the worker (T2) # Generate more clicks for i in $(seq 1 20); do curl -s -o /dev/null http://localhost:3000/<code>; done # Inspect the queue docker compose exec redis redis-cli LLEN bull:clicks:wait # Restart the worker npm run worker # Wait a few seconds — queue drains, count goes to 70
Temporarily make the worker pretend to fail half the time, then re-process. Counts should stay correct because of the unique constraint.
// in worker.ts, inside the handler, before the INSERT:
if (Math.random() < 0.5) throw new Error('flaky downstream');
Generate 100 clicks; total in clicks table should be exactly 100 once the queue drains. Remove the chaos line afterwards.
Add a tiny periodic check in worker.ts:
import { QueueEvents } from 'bullmq';
const events = new QueueEvents('clicks', {
connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' }
});
events.on('failed', ({ jobId, failedReason }) =>
console.error('DLQ candidate:', jobId, failedReason));
Real alerting wiring lands in Module 07.
git checkout -b module-06 git add . git commit -m "module 06: bullmq click pipeline, stats endpoint, idempotent worker" git push -u origin module-06
message_id exists and you use ON CONFLICT DO NOTHING.awaited enqueueClick. Fire and forget; log enqueue failures.redis:7-alpine from Module 04 is fine.