CluedIn for Developers — build, integrate, automate

On this page

  1. 0) Your First 48 Hours (Checklist)
  2. 1) Architecture (Developer View)
  3. 2) Authentication & Identity
    1. 2.1 OAuth 2.0 Client (preferred for services)
    2. 2.2 Personal Access Tokens (PAT) (for local dev)
    3. 2.3 Request Hygiene
  4. 3) API Patterns (Paging, Filtering, Errors)
  5. 4) Ingestion Producers
    1. 4.1 HTTP/Webhook (JSON lines)
    2. 4.2 Node.js example
    3. 4.3 Python example
    4. 4.4 Kafka/Event Hub
    5. 4.5 Batch (S3/Blob/ADLS)
  6. 5) Mapping, Cleaning, Validation as Code
    1. 5.1 Minimal Mapping (pseudo‑YAML)
    2. 5.2 Cleaning Project
    3. 5.3 Validations
  7. 6) Exports & Contracts
    1. 6.1 Export Config (table)
    2. 6.2 Contract (YAML)
  8. 7) Webhooks & Eventing
    1. 7.1 Registering a Webhook (pseudo)
    2. 7.2 Verify Signatures
  9. 8) AI Agents (Programmatic Use)
    1. 8.1 Run an Analysis
    2. 8.2 Retrieve Findings
  10. 9) Dedup APIs
    1. 9.1 Create Rules (deterministic first)
    2. 9.2 Merge/Unmerge (pseudo)
  11. 10) Logs, Audit & Observability
    1. 10.1 Fetch Logs (illustrative)
    2. 10.2 Audit Events
    3. 10.3 Correlation & Tracing
  12. 11) Testing & Local Dev
    1. 11.1 Unit Tests
    2. 11.2 Contract Tests
    3. 11.3 Stubs & Mocks
    4. 11.4 Example PyTest
  13. 12) CI/CD & Promotion
    1. 12.1 GitHub Actions (sketch)
    2. 12.2 Release Notes
  14. 13) Security Essentials
  15. 14) Performance & Cost
  16. 15) Cookbooks
    1. 15.1 Build an Ingestion Microservice (Node.js)
    2. 15.2 Export‑Driven Reverse ETL (Python)
    3. 15.3 Webhook to Teams
    4. 15.4 Backfill CLI
  17. 16) Templates
    1. 16.1 .env.example
    2. 16.2 Makefile
    3. 16.3 Dockerfile (Node)
  18. 17) Troubleshooting
  19. 18) Operating Rhythm
  20. 19) Definition of Done (Dev)

Audience: Application developers, integration engineers, platform/DevOps engineers
Goal: Give developers a practical, end‑to‑end handbook to build against CluedIn: auth, APIs/SDK patterns, ingestion producers, export consumers, webhooks, cleaning/validation-as-code, AI Agents, dedup, logging/audit, testing, CI/CD, and secure operations.

Assumptions: Your org uses SSO, keeps config as code, and prefers automated ingestion (streams/batches) over manual file uploads.


0) Your First 48 Hours (Checklist)

Access & Tools

  • Sign in via SSO; confirm a Developer/Engineer role with needed scopes.
  • Get a short‑lived token (OAuth client) or scoped PAT for local dev.
  • Install: HTTP client (curl/Postman), language SDKs, and repo with config‑as‑code.

Hello, CluedIn

  • Call /api/me to test auth.
  • Create a sandbox Ingestion Endpoint and push a sample record.
  • Create a staging Export and run an end‑to‑end smoke.

Guardrails

  • Set correlation_id conventions for all requests.
  • Configure retries with jitter and idempotency keys.
  • Wire a dead‑letter path and a simple replay tool.

1) Architecture (Developer View)

[Producer] → Ingestion Endpoint → Raw Store → Mapping → Cleaning/Validation →
Dedup/Golden → Exports (tables/topics/APIs/files) → [Consumers/Apps/BI]
                                 ↓
                            AI Agents (analysis/suggestions)

You primarily write:

  • Producers that push data in (HTTP/Kafka/Batch/CDC).
  • Consumers that react to Exports or Webhooks.
  • Jobs or functions that call CluedIn APIs (AI Agents, dedup, policies).
  • Infra glue (CI/CD, secrets, logging, alerts).

2) Authentication & Identity

2.1 OAuth 2.0 Client (preferred for services)

POST /oauth/token
{
  "grant_type": "client_credentials",
  "client_id": "<ID>",
  "client_secret": "<SECRET>",
  "scope": "ingest:write export:read policy:read"
}

2.2 Personal Access Tokens (PAT) (for local dev)

  • Create in Admin → API Tokens with minimal scopes and expiry.
  • Store in a secret manager; never commit to git.

2.3 Request Hygiene

Always send:

  • Authorization: Bearer <token>
  • X-Correlation-Id: <uuid>
  • Content-Type: application/json (for JSON payloads)

3) API Patterns (Paging, Filtering, Errors)

Paging

GET /api/entities?type=Person&limit=200&cursor=<token>
→ 200 + { "items": [...], "next_cursor": "..." }

Filtering

GET /api/exports/runs?name=warehouse-contacts-v1&since=2025-08-20T00:00:00Z

Common errors (illustrative)

  • 400 invalid schema / missing required fields
  • 401/403 auth/permission issues
  • 409 conflict (idempotency violation)
  • 429 rate limit → backoff and retry
  • 5xx transient → exponential backoff with jitter

Retry policy (pseudo)

const base = 250; // ms
retry = attempt => Math.min(30_000, base * 2 ** attempt) + rand(0, 200);

4) Ingestion Producers

4.1 HTTP/Webhook (JSON lines)

curl -X POST \
  -H "Authorization: Bearer $TOKEN" \
  -H "X-Correlation-Id: $(uuidgen)" \
  -H "Content-Type: application/json" \
  -d '{"source":"crm-contacts","payload":{"id":"c_123","email":"a@example.com","updated_at":"2025-08-22T12:00:00Z"}}' \
  https://<HOST>/api/ingest

Best practices

  • Idempotency: include stable id and updated_at.
  • Batching: prefer small batches (e.g., 1–5k records).
  • Compression: Content-Encoding: gzip for large payloads.
  • DLQ: send parse failures to a durable store with replay tooling.

4.2 Node.js example

import axios from "axios";
const client = axios.create({ baseURL: process.env.CLUE_HOST });
async function ingestContact(c) {
  const r = await client.post("/api/ingest", {
    source: "crm-contacts",
    payload: c,
  }, {
    headers: {
      Authorization: `Bearer ${process.env.CLUE_TOKEN}`,
      "X-Correlation-Id": crypto.randomUUID()
    },
    timeout: 10000
  });
  return r.data;
}

4.3 Python example

import os, uuid, requests, json
host = os.environ["CLUE_HOST"]; token = os.environ["CLUE_TOKEN"]
payload = {"source":"crm-contacts","payload":{"id":"c_123","email":"a@example.com"}}
r = requests.post(f"{host}/api/ingest", json=payload,
  headers={"Authorization":f"Bearer {token}","X-Correlation-Id":str(uuid.uuid4())}, timeout=10)
r.raise_for_status()
print(r.json())

4.4 Kafka/Event Hub

  • One topic per domain/entity; include schema version in headers.
  • Ensure producer idempotence and ordered keys (e.g., id).

4.5 Batch (S3/Blob/ADLS)

  • Land JSON/CSV/Parquet on schedule; register a bucket‑watch endpoint.
  • Keep manifest files for completeness and replay.

5) Mapping, Cleaning, Validation as Code

5.1 Minimal Mapping (pseudo‑YAML)

entity: Person
source: "crm-contacts"
fields:
  id: $.id
  email: $.email
  first_name: $.first_name
  last_name: $.last_name
  updated_at: $.updated_at

5.2 Cleaning Project

project: normalize_contacts
schedule: "0 * * * *"
steps:
  - name: normalize_email
    action: set
    field: email
    value: lower(trim(email))
  - name: e164_phone
    when: phone is not null
    action: set
    field: phone
    value: to_e164(phone, default_country="AU")

5.3 Validations

rule: email_regex
entity: Person
check: { regex: "^[^@\\s]+@[^@\\s]+\\.[^@\\s]+$" }
severity: high
on_fail: { action: "flag" }

Dev loop: edit YAML → PR → staging export diff → promote.


6) Exports & Contracts

6.1 Export Config (table)

{
  "name": "warehouse-contacts-v1",
  "type": "sql-table",
  "options": {
    "connection": "analytics-warehouse",
    "schema": "mdm",
    "table": "contacts_v1",
    "mode": "upsert",
    "primary_key": ["contact_id"]
  },
  "mapping": {
    "contact_id": "Person.id",
    "email": "Person.email",
    "first_name": "Person.first_name",
    "last_name": "Person.last_name",
    "updated_at": "Person.updated_at"
  },
  "schedule": "0 * * * *"
}

6.2 Contract (YAML)

name: contacts_v1
primary_key: contact_id
delivery: { type: sql-table, schedule: hourly }
sla: { freshness_p95_minutes: 60 }
compatibility: additive_only

Versioning: breaking changes → _v2; run both until consumers migrate.


7) Webhooks & Eventing

7.1 Registering a Webhook (pseudo)

POST /api/webhooks
{
  "name": "export-success-teams",
  "events": ["export.succeeded"],
  "url": "https://example.com/hooks/export",
  "secret": "<HMAC_SECRET>"
}

7.2 Verify Signatures

import crypto from "crypto";
function verify(sig, body, secret) {
  const h = crypto.createHmac("sha256", secret).update(body).digest("hex");
  return crypto.timingSafeEqual(Buffer.from(sig,"hex"), Buffer.from(h,"hex"));
}

Retry model: Webhooks are retried on non‑2xx; make handlers idempotent.


8) AI Agents (Programmatic Use)

8.1 Run an Analysis

POST /api/ai/agents/run
{
  "agent": "dq-analyzer",
  "target": { "entity": "Person" },
  "mode": "analysis",            // analysis | suggest | auto_fix (guarded)
  "options": { "sample": 10000 }
}

8.2 Retrieve Findings

GET /api/ai/agents/runs/<run_id>/findings
→ { "issues":[{"field":"email","type":"invalid","examples":[...]}, ...] }

Guardrails

  • Restrict to masked views for PII.
  • Treat auto‑fixes as code changes (reviewable, reversible).

9) Dedup APIs

9.1 Create Rules (deterministic first)

rules:
  - name: exact_email
    when: lower(email) == lower(other.email)
    confidence: 0.98
  - name: phone_e164
    when: e164(phone) == e164(other.phone)
    confidence: 0.95
auto_approve_threshold: 0.97
queue_threshold: 0.85

9.2 Merge/Unmerge (pseudo)

POST /api/dedup/merge { "entity":"Person", "ids":["p1","p2"] }
POST /api/dedup/unmerge { "entity":"Person", "id":"p1" }

Log decisions; update survivorship config.


10) Logs, Audit & Observability

10.1 Fetch Logs (illustrative)

GET /api/logs?category=export&name=warehouse-contacts-v1&since=2025-08-23T00:00:00Z

10.2 Audit Events

  • SSO sign‑ins, role grants, token lifecycle, policy changes, export promotions, merges.
GET /api/audit?action=policy.update&since=2025-08-01

10.3 Correlation & Tracing

  • Pass a correlation_id end‑to‑end and include it in logs and errors.
  • Emit metrics: success/failure counts, latency, row counts, DLQ size.

11) Testing & Local Dev

11.1 Unit Tests

  • Validate normalization helpers and schema mappers.
  • Golden files for tricky encodings, null behavior, and long strings.

11.2 Contract Tests

  • Assert export schema and SLA (freshness).

11.3 Stubs & Mocks

  • Spin a mock CluedIn (OpenAPI stub) for local dev.
  • Record/replay HTTP using tools like vcrpy/nock.

11.4 Example PyTest

def test_email_normalization():
    assert normalize_email("  A@Example.COM  ") == "a@example.com"

12) CI/CD & Promotion

12.1 GitHub Actions (sketch)

name: cluedin-pipelines
on: [push, pull_request]
jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: ./tools/validate-config.sh   # lint YAML/JSON
      - run: pytest -q                    # unit tests
  deploy-staging:
    needs: validate
    if: github.ref == 'refs/heads/main'
    steps:
      - run: ./tools/apply.sh env/test
  deploy-prod:
    needs: deploy-staging
    steps:
      - run: ./tools/apply.sh env/prod    # change window only

12.2 Release Notes

  • Summarize mapping/cleaning/export diffs, risk, rollback, owners, metrics to watch.

13) Security Essentials

  • Least privilege scopes for tokens; rotate ≤ 90 days.
  • Secrets in vaults, never in code or logs.
  • Mask PII by default in non‑prod and in AI prompts.
  • Validate webhook signatures and protect endpoints with allowlists.
  • Log who, what, where (IP), and when for sensitive ops.

14) Performance & Cost

  • Prefer Parquet for batch; partition by date/time.
  • Tune batch size and parallelism; avoid tiny files.
  • Cache reference data; precompute hot aggregates.
  • Run heavy dedup or fuzzy steps off‑peak.

15) Cookbooks

15.1 Build an Ingestion Microservice (Node.js)

  1. Read from CRM API delta endpoint.
  2. Transform minimal fields; add updated_at.
  3. POST to /api/ingest with retries+DLQ.
  4. Emit metrics and pass X-Correlation-Id.

15.2 Export‑Driven Reverse ETL (Python)

  1. Poll /api/exports/runs?name=contacts_v1&status=success.
  2. Diff changed rows since last watermark.
  3. Upsert to downstream SaaS/CX via their API.
  4. Log audit record with counts and latency.

15.3 Webhook to Teams

  • Register export.succeeded webhook.
  • Verify HMAC, format message card, post to Teams webhook URL.

15.4 Backfill CLI

  • Accept start/end timestamps and entity type.
  • Read source snapshots; push batches with rate limit and progress bar.
  • Tag runs with correlation_id and write a resumable state file.

16) Templates

16.1 .env.example

CLUE_HOST=https://your-cluedin-host
CLUE_TOKEN=
LOG_LEVEL=info

16.2 Makefile

lint: ; ./tools/validate-config.sh
test: ; pytest -q
run:  ; node src/index.js

16.3 Dockerfile (Node)

FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --omit=dev
COPY . .
CMD ["node","src/index.js"]

17) Troubleshooting

401/403: token scope/expiry; SSO group‑to‑role mapping.
429: respect Retry-After; backoff with jitter; reduce concurrency.
Schema null explosion: check mapping field paths and cleaning order.
Webhook storms: dedupe by event id; idempotent handlers; collapse alerts.
Export drift: switch to staging export; diff schemas; version bump if breaking.


18) Operating Rhythm

Daily: glance pipeline health, DLQ, last export run, top errors.
Weekly: ship 1–2 improvements; tighten tests; review costs.
Monthly: token/secrets rotation; access review; deprecate old exports.


19) Definition of Done (Dev)

  • Config in repo with PR review & audit links.
  • Staging run green; prod run green ×3.
  • DQ metrics same or better.
  • Runbook updated; alert routed; rollback noted.

You’re set: build producers and consumers, codify mapping/cleaning/validations, leverage AI Agents with guardrails, program the dedup lifecycle, observe everything, and ship safely with versioned exports and CI/CD.