The Watchtower

The moat is the pipeline, not the data.

28 sources across 13 jurisdictions — FDA NDC, DailyMed, RxNorm, dm+d, CIMA, BDPM, AIFA, BAG-SL, AIPS, EPha, EMA SPOR and more — drift upstream without notice. Every consumer rediscovers the breakage individually, usually weeks late, usually after a customer complaint. We catch it at staging.

raw01
sources/raw/{source}/{run_id}
pull, retain 12 months
staging02
{source}_staging
typed staging table
validation03
schema validate
XSD · JSON-schema · regex
promotion04
drugs · interactions · atc
production read path

Pinned schemas

Per-source assertions across all 28 feeds — BAG-SL column checks, AIPS / dm+d / CIMA XSD, EPha / FDA NDC / DailyMed JSON-schema, oddb and RxNorm per-column regex. Failed rows land in a per-source quarantine; promotion is blocked above a 1% fail-rate.

Drift alerts

A run flagged `schema_drift` keeps the last-good snapshot in production, fires PagerDuty + Slack, and surfaces `schema_state: "stale"` in every API response so customers know before they ship a broken refill.

Diff-based change feed

Every promoted row writes a row to `changes` (before, after, source, occurred_at). Subscribe via webhook or `GET /v1/changes?since=...` — most-requested feature from EHR vendors.

Stale-data honesty

Every response carries `_meta.last_synced_at` and `schema_state` per source. The previous good snapshot is still served, but flagged. Silent shipping of broken data is the failure mode this exists to prevent.

In every response_meta
{
  "drug": { ... },
  "_meta": {
    "ndc":    { "last_synced_at": "2026-05-30T04:00:00Z", "schema_state": "ok" },
    "dmd":    { "last_synced_at": "2026-05-29T02:00:00Z", "schema_state": "ok" },
    "cima":   { "last_synced_at": "2026-05-30T03:00:00Z", "schema_state": "ok" },
    "bag_sl": { "last_synced_at": "2026-05-15T05:01:00Z", "schema_state": "ok" },
    "aips":   { "last_synced_at": "2026-05-26T05:00:00Z", "schema_state": "ok" },
    "epha":   { "last_synced_at": "2026-05-25T03:00:00Z",
                "schema_state": "stale", "reason": "schema_drift_2026-05-29" }
  }
}

Operational runbook

What happens when something drifts.

28 sources across 13 jurisdictions, each on its own cadence. Detection is the easy part. The interesting question is what we do in the four hours after the alert fires — and what your code sees while we're working.

  1. 01

    Detect

    Per-source validator trips on column rename, type change, XML element drift, JSON-schema fail. The run gets `schema_drift` status, promotion is blocked, last-good stays live.

  2. 02

    Alert

    PagerDuty + Slack fire within the cron tick. The runbook URL goes in the body. `_meta.schema_state` flips to `stale` in every response touching that source.

  3. 03

    Triage

    On-call inspects `clinical_drug_data_sync_runs.payload` for the diff. Additive changes go through `bag_sl_add_observed_column` with no redeploy.

  4. 04

    Recover

    Destructive changes get a parser patch + a forced re-run. SLA: best-effort 24h. Customers see the state transition in the change feed and the API response.

Why staging-first

Every source has an edge function that pulls the upstream artefact, parses it, and writes to a staging table before any production row moves. drugs, interactions, pharmacode_aliases see only promoted rows.

sources/raw/{source}/{run_id}.{format}
              ↓
{source}_staging
              ↓ validation
{source}_quarantine
              ↓ promotion
drugs · interactions · atc_codes

The changes table

Every promotion writes one row per affected natural-key. Customers consume via webhook (Growth+) orGET /v1/changes?since=…. Most-requested feature from EHR vendors who currently re-import the catalogue monthly because they cannot tell what moved.

CREATE TABLE changes (
  id            uuid PRIMARY KEY,
  table_name    text NOT NULL,
  natural_key   jsonb NOT NULL,
  drug_id       uuid REFERENCES drugs(id),
  change_type   text CHECK (
                  change_type IN ('insert','update','delete')),
  before        jsonb,
  after         jsonb,
  source        text NOT NULL,
  occurred_at   timestamptz NOT NULL DEFAULT now()
);