Idempotent Usage Event Ingestion
Usage events that bill customers must be counted exactly once even when the queue delivers them two or three times, so the ingestion layer needs an idempotency key per (tenant_id, event_id) and a write path that absorbs duplicates without inflating a bill. This guide is a focused part of Usage Metering Event Pipelines: how to assign keys, dedupe within a bounded window, and use conditional writes so at-least-once delivery becomes exactly-once effect.
Problem Framing
Every durable queue worth using — SQS, Kafka, Pub/Sub, NATS JetStream — delivers at-least-once. A consumer can crash after processing a message but before acknowledging it, a visibility timeout can expire while a slow handler is still running, or a partition can rebalance mid-batch. In all three cases the broker re-delivers, and the broker is correct to do so: it cannot know whether your side effect committed. Exactly-once delivery does not exist across a network boundary; what you can build is exactly-once effect, where redelivery is observed and discarded.
For metering this is not a nicety. A usage event is money. If an "API call" event is counted twice, the tenant is overbilled and you lose trust; if a deduplication bug drops a real event, you under-bill and lose revenue. Unlike a cache or a search index, you cannot reconcile a metering error by "just reprocessing" — reprocessing is exactly the duplicate-generating event you are trying to survive. The ingestion layer is therefore the single place where the at-least-once guarantee of the transport must be converted into the at-most-once accounting the ledger requires, and it must do so per tenant so one tenant's replay storm cannot corrupt another's counts.
The mechanism is an idempotency key: a value the producer assigns when the event first occurs, stable across every redelivery of that same logical event, and unique within the tenant. The consumer records which keys it has already applied and treats a second arrival of a known key as a no-op. The whole design reduces to two questions — what is the key, and how do you check-and-apply it atomically — plus one practical constraint: you cannot remember every key forever, so you need a bounded dedupe window. The diagram below traces a duplicate through the path.
The decision that matters is where the key comes from and whether the dedupe check shares a transaction with the meter write. Get those two right and redelivery stops mattering.
Step-by-Step Guide
1. Have the producer assign a stable event id
The idempotency key must originate at the source, when the event first happens, and must be identical on every redelivery. Never generate it in the consumer from arrival time or a fresh UUID — that defeats the entire mechanism, because two deliveries of one event would get two keys. A deterministic id derived from the event's own content is the safest choice: if the producer retries before getting an ack, it re-emits the same id.
import { createHash } from "node:crypto";
type RawUsage = {
tenantId: string;
meter: string; // e.g. "api_calls"
quantity: number;
occurredAt: string; // ISO-8601, set at the moment of use
resourceId: string; // the thing being metered, e.g. request id
};
export function eventId(u: RawUsage): string {
// Deterministic: same logical event -> same id on every retry.
const basis = `${u.tenantId}|${u.meter}|${u.resourceId}|${u.occurredAt}`;
return createHash("sha256").update(basis).digest("hex").slice(0, 32);
}
If the producer already has a natural unique id for the unit of work (a request id, a job id), use that directly rather than hashing — it is cheaper and just as stable.
2. Model the dedupe key with a tenant-scoped unique constraint
Store events in a table whose uniqueness is (tenant_id, event_id). Scoping the constraint to the tenant keeps each tenant's keyspace independent, so a hash collision or id reuse in one tenant can never suppress another tenant's event, and it lets you partition or shard by tenant_id later without reworking the key.
CREATE TABLE usage_events (
tenant_id uuid NOT NULL,
event_id text NOT NULL,
meter text NOT NULL,
quantity numeric NOT NULL CHECK (quantity >= 0),
occurred_at timestamptz NOT NULL,
ingested_at timestamptz NOT NULL DEFAULT now(),
CONSTRAINT usage_events_pk PRIMARY KEY (tenant_id, event_id)
);
CREATE INDEX idx_usage_events_tenant_time
ON usage_events (tenant_id, occurred_at);
The primary key doubles as the dedupe index: a duplicate insert collides on (tenant_id, event_id) and the database rejects it, no separate lookup required.
3. Apply the event with a conditional upsert
Use INSERT ... ON CONFLICT DO NOTHING so the first delivery inserts and every redelivery is silently absorbed. The RETURNING clause tells you whether the row was actually written — an empty result means it was a duplicate, which is precisely the signal you need to avoid double-incrementing any downstream aggregate in the same handler.
INSERT INTO usage_events (tenant_id, event_id, meter, quantity, occurred_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (tenant_id, event_id) DO NOTHING
RETURNING event_id;
ON CONFLICT DO NOTHING is the right verb here precisely because usage events are immutable facts: a redelivery carries identical data, so there is nothing to update. Reserve DO UPDATE for late-arriving corrections, and even then guard it with a WHERE clause so an old redelivery cannot overwrite a newer correction.
4. Keep the dedupe check and the aggregate update in one transaction
If you maintain a rolling per-tenant counter for fast quota checks, the insert and the increment must be atomic. Otherwise a crash between them either double-counts (increment ran, insert retried) or under-counts (insert ran, increment lost). Bind both to the row actually inserted.
import { Pool } from "pg";
const pool = new Pool();
export async function ingest(ev: { tenantId: string; eventId: string;
meter: string; quantity: number; occurredAt: string; }): Promise<boolean> {
const c = await pool.connect();
try {
await c.query("BEGIN");
const ins = await c.query(
`INSERT INTO usage_events (tenant_id, event_id, meter, quantity, occurred_at)
VALUES ($1,$2,$3,$4,$5)
ON CONFLICT (tenant_id, event_id) DO NOTHING
RETURNING event_id`,
[ev.tenantId, ev.eventId, ev.meter, ev.quantity, ev.occurredAt],
);
const applied = ins.rowCount === 1;
if (applied) {
await c.query(
`INSERT INTO usage_counters (tenant_id, meter, period, total)
VALUES ($1,$2,date_trunc('month',$3::timestamptz),$4)
ON CONFLICT (tenant_id, meter, period)
DO UPDATE SET total = usage_counters.total + EXCLUDED.total`,
[ev.tenantId, ev.meter, ev.occurredAt, ev.quantity],
);
}
await c.query("COMMIT");
return applied; // false = duplicate, ack and move on
} catch (e) {
await c.query("ROLLBACK");
throw e; // do NOT ack; let the queue redeliver
} finally {
c.release();
}
}
The contract for the queue handler is simple: a thrown error means do not acknowledge (safe — redelivery will dedupe), and any return value means acknowledge.
5. Bound the dedupe window
You cannot retain every key forever, and you do not need to. A broker only redelivers within a finite horizon — its retention or visibility window. Keep keys at least as long as the maximum possible redelivery gap plus a safety margin, then prune. A 7-day window comfortably covers most broker retention and replay-on-recovery scenarios.
DELETE FROM usage_events
WHERE ingested_at < now() - interval '7 days';
Prune in small batches on a schedule so the delete does not lock the table. If you also need the raw events for long-term audit, copy them to cold storage before pruning the hot dedupe table, so the dedupe window and the retention window are decoupled — the cold copy lives in your tenant-partitioned time-series store, while the hot table stays small and fast.
6. Make the downstream billing push idempotent too
Idempotency does not stop at your database. When aggregated usage is reported to the payment processor, that call must also carry an idempotency key, because your own retry of a failed report can otherwise create a duplicate usage record on their side. Reuse a deterministic key built from the tenant, meter, and billing period so a retried report collapses to one. The full reconciliation path is covered in billing sync with Stripe.
await stripe.billing.meterEvents.create(
{ event_name: meter, payload: { stripe_customer_id: customerId, value } },
{ idempotencyKey: `${tenantId}:${meter}:${period}` },
);
Verification
Prove that a replayed event changes nothing. Insert the same event twice and assert the row count and the counter total are unchanged by the second attempt.
-- first delivery
INSERT INTO usage_events (tenant_id, event_id, meter, quantity, occurred_at)
VALUES ('11111111-1111-1111-1111-111111111111', 'evt_abc', 'api_calls', 5, now())
ON CONFLICT (tenant_id, event_id) DO NOTHING
RETURNING event_id; -- returns 'evt_abc'
-- redelivery of the exact same event
INSERT INTO usage_events (tenant_id, event_id, meter, quantity, occurred_at)
VALUES ('11111111-1111-1111-1111-111111111111', 'evt_abc', 'api_calls', 5, now())
ON CONFLICT (tenant_id, event_id) DO NOTHING
RETURNING event_id; -- returns ZERO rows
SELECT total FROM usage_counters
WHERE tenant_id = '11111111-1111-1111-1111-111111111111'
AND meter = 'api_calls'; -- total = 5, not 10
The second insert returning zero rows is the proof: the handler sees applied = false, skips the increment, acknowledges the message, and the total stays at 5. In an automated test, drive the same payload through your real handler twice and assert the second call returns false while the persisted total is unchanged. For load-testing, replay a recorded batch with deliberate duplicates injected and confirm the final per-tenant totals match the count of distinct event ids, not the total message count.
Failure Modes & Gotchas
- Generating the key in the consumer. Symptom: every redelivery is counted as a new event and bills inflate under retry. Root cause: a fresh UUID or arrival timestamp assigned on consume differs per delivery. Fix: derive the key deterministically at the producer from event content, or reuse a natural unique id.
- Global instead of tenant-scoped keys. Symptom: one tenant's event silently suppresses another's, or sharding by tenant later breaks dedupe. Root cause: a unique constraint on
event_idalone. Fix: make the constraint(tenant_id, event_id). - Acknowledging before the commit succeeds. Symptom: occasional lost events with no error logged. Root cause: the handler acks the message, then the transaction rolls back, so the only copy is gone and the broker will not redeliver. Fix: ack strictly after
COMMITreturns; on any error, throw and let redelivery handle it. - Dedupe window shorter than broker retention. Symptom: very old redeliveries (after an outage or a replay-from-offset) are counted again. Root cause: keys pruned before the broker stopped being able to redeliver them. Fix: set the prune horizon to at least the broker's maximum retention plus margin.
FAQ
Why not rely on exactly-once delivery from the broker instead? Because exactly-once delivery across a network does not exist; even Kafka's "exactly-once semantics" only holds inside a single Kafka transaction and breaks at the boundary to an external sink like your database. The durable, transport-independent guarantee is exactly-once effect, achieved by making the consumer idempotent with a stored key.
Where should the dedupe state live — the database or a cache like Redis?
Put it wherever the side effect is committed, so the dedupe check and the write share a transaction. A relational primary key on (tenant_id, event_id) gives you atomic check-and-apply for free. A separate Redis SETNX works for very high throughput but reintroduces a race between the cache check and the database write unless they are made atomic, so reserve it for cases where the database cannot keep up.
How long should I keep idempotency keys? At least the maximum interval over which the broker can redeliver — its retention or replay window — plus a safety margin. Seven days is a common default; lengthen it if you ever replay from old offsets for recovery, and prune in batches so the cleanup does not lock the table.