URL Shortener Tutorial · Module 06 of 11

Analytics & Background Jobs

By the end of this tutorial you'll have a click pipeline: every redirect enqueues an event, a separate worker writes it to a clicks table idempotently, and GET /stats/:code serves aggregate counts. Failed jobs land in a DLQ.

~4–6 hrsBullMQWorkersDLQIdempotency
← Back to Module 06 overview
Definition of Done

What You'll Have

  • A clicks table with a unique message_id for idempotency.
  • BullMQ queue + worker process; npm run worker starts it.
  • Redirect path enqueues the event without blocking the response.
  • Worker retries failures with exponential backoff; gives up to a DLQ.
  • GET /stats/:code returns total clicks + by-day buckets.
  • Killing the worker doesn't lose events — they queue and drain on restart.
The Steps

Build It

STEP 1

Schema migration: clicks table

npm run migrate -- create clicks
export const up = (pgm) => {
  pgm.createTable('clicks', {
    id:          { type: 'bigserial', primaryKey: true },
    message_id:  { type: 'uuid', notNull: true, unique: true },
    link_id:     { type: 'bigint', notNull: true, references: 'links(id)', onDelete: 'CASCADE' },
    occurred_at: { type: 'timestamptz', notNull: true },
    referrer:    { type: 'text' },
    user_agent:  { type: 'text' }
  });
  pgm.createIndex('clicks', ['link_id', 'occurred_at']);
};
export const down = (pgm) => pgm.dropTable('clicks');
npm run migrate:up
STEP 2

Install BullMQ

npm install bullmq
STEP 3

Create the queue producer

src/queue.ts:

import { Queue } from 'bullmq';

export const clickQueue = new Queue('clicks', {
  connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' }
});

export type ClickEvent = {
  id: string;             // stable message id (uuid)
  link_id: number;
  occurred_at: string;
  referrer: string | null;
  user_agent: string | null;
};

export async function enqueueClick(ev: ClickEvent) {
  await clickQueue.add('click', ev, {
    jobId: ev.id,
    attempts: 5,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: 1000,
    removeOnFail: 5000
  });
}
STEP 4

Update the redirect path to enqueue

In src/app.ts:

import { randomUUID } from 'node:crypto';
import { enqueueClick } from './queue.js';

app.get('/:code', async (req, res) => {
  // …existing cache + DB logic…
  // After we've decided the target, before res.redirect:
  const linkId = rows[0].id ?? cachedId;        // adjust your shape
  res.redirect(302, target);

  // Fire-and-forget — don't await
  enqueueClick({
    id: randomUUID(),
    link_id: Number(linkId),
    occurred_at: new Date().toISOString(),
    referrer: req.get('referer') ?? null,
    user_agent: req.get('user-agent') ?? null
  }).catch(e => console.warn('enqueue failed:', e.message));
});
Gotcha: you'll need id from the link query — update your SELECT to return it. Cache the ID along with the URL too (or look it up once on miss).
STEP 5

Write the worker

src/worker.ts:

import 'dotenv/config';
import { Worker } from 'bullmq';
import { pool } from './db.js';
import type { ClickEvent } from './queue.js';

const worker = new Worker<ClickEvent>('clicks', async (job) => {
  const ev = job.data;
  // Idempotent insert — unique(message_id) absorbs duplicates
  await pool.query(
    `INSERT INTO clicks(message_id, link_id, occurred_at, referrer, user_agent)
     VALUES ($1, $2, $3, $4, $5)
     ON CONFLICT (message_id) DO NOTHING`,
    [ev.id, ev.link_id, ev.occurred_at, ev.referrer, ev.user_agent]
  );
}, {
  connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' },
  concurrency: 10
});

worker.on('completed', (job) => console.log('done', job.id));
worker.on('failed',    (job, err) => console.error('fail', job?.id, err.message));
npm pkg set scripts.worker="tsx src/worker.ts"
STEP 6

Add the stats endpoint

app.get('/stats/:code', requireAuth, async (req, res) => {
  const { rows: link } = await pool.query(
    'SELECT id, owner_id FROM links WHERE code = $1', [req.params.code]
  );
  if (link.length === 0) return res.sendStatus(404);
  if (link[0].owner_id !== Number(req.user!.sub) && req.user!.role !== 'admin')
    return res.sendStatus(403);

  const { rows } = await pool.query(
    `SELECT date_trunc('day', occurred_at)::date AS day, COUNT(*)::int AS clicks
     FROM clicks WHERE link_id = $1
     GROUP BY 1 ORDER BY 1 DESC LIMIT 30`,
    [link[0].id]
  );
  const total = rows.reduce((s, r) => s + r.clicks, 0);
  res.json({ total, by_day: rows });
});
STEP 7

Run it end-to-end

Open three terminals:

# T1
npm run dev
# T2
npm run worker
# T3 — generate clicks
for i in $(seq 1 50); do
  curl -s -o /dev/null http://localhost:3000/<your-code>
done
# T3 — check stats
curl -b jar.txt http://localhost:3000/stats/<your-code>
✓ Verify: stats endpoint returns total: 50 with one bucket for today.
STEP 8

Prove durability — kill the worker

# Stop the worker (T2)
# Generate more clicks
for i in $(seq 1 20); do curl -s -o /dev/null http://localhost:3000/<code>; done
# Inspect the queue
docker compose exec redis redis-cli LLEN bull:clicks:wait
# Restart the worker
npm run worker
# Wait a few seconds — queue drains, count goes to 70
STEP 9

Verify idempotency

Temporarily make the worker pretend to fail half the time, then re-process. Counts should stay correct because of the unique constraint.

// in worker.ts, inside the handler, before the INSERT:
if (Math.random() < 0.5) throw new Error('flaky downstream');

Generate 100 clicks; total in clicks table should be exactly 100 once the queue drains. Remove the chaos line afterwards.

STEP 10

Set up a DLQ alarm (mock)

Add a tiny periodic check in worker.ts:

import { QueueEvents } from 'bullmq';
const events = new QueueEvents('clicks', {
  connection: { url: process.env.REDIS_URL ?? 'redis://localhost:6379' }
});
events.on('failed', ({ jobId, failedReason }) =>
  console.error('DLQ candidate:', jobId, failedReason));

Real alerting wiring lands in Module 07.

STEP 11

Commit

git checkout -b module-06
git add .
git commit -m "module 06: bullmq click pipeline, stats endpoint, idempotent worker"
git push -u origin module-06
Common Gotchas

If Something Goes Wrong

  • Worker doesn't pick up jobs — both producer and worker must point to the same Redis instance and queue name.
  • Duplicate clicks in the table — make sure the unique constraint on message_id exists and you use ON CONFLICT DO NOTHING.
  • Redirect latency went up — you accidentally awaited enqueueClick. Fire and forget; log enqueue failures.
  • BullMQ requires Redis ≥ 6.2 — the redis:7-alpine from Module 04 is fine.
What's Next

Move On