Skip to content

taima4/Webhook-Driven-Task-Processing-Pipeline

Repository files navigation

Webhook-Driven Task Processing Pipeline

CI Status

A lightweight, extensible service that receives webhooks, processes them through a job queue, and delivers results to registered subscribers — inspired by Zapier.


Table of Contents


Overview

This service lets you create pipelines. Each pipeline connects:

  1. A Source — a unique URL that accepts incoming webhooks
  2. A Processing Action — something that transforms the incoming data
  3. Subscribers — one or more URLs where the processed result gets delivered

When a webhook hits a pipeline's source URL, it is not processed synchronously. Instead, it is queued as a background job and processed by a dedicated worker — ensuring the API stays fast and resilient under load.


Architecture

┌─────────────────────────────────────────────────────────────┐
│                        CLIENT / SENDER                       │
└──────────────────────────┬──────────────────────────────────┘
                           │  POST /webhooks/:token
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                        API SERVER                            │
│  • Validates token → finds pipeline                          │
│  • Creates Job record in PostgreSQL (status: pending)        │
│  • Pushes job to BullMQ Queue                                │
│  • Returns 202 Accepted immediately                          │
└──────────────────────────┬──────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                     REDIS (BullMQ Queue)                     │
└──────────────────────────┬──────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                        WORKER                                │
│  • Picks up job from queue                                   │
│  • Runs the configured Action (validate/mask/transform/enrich)│
│  • Delivers result to each Subscriber URL                    │
│  • Records delivery attempts in PostgreSQL                   │
│  • Updates job status (completed / failed)                   │
└──────────────────────────┬──────────────────────────────────┘
                           │  POST result
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                   SUBSCRIBER URLS                            │
└─────────────────────────────────────────────────────────────┘

Database Schema

pipelines ──< subscribers
    │
    └──< jobs ──< delivery_attempts
  • pipelines — stores pipeline config, action type, and a unique sourceToken
  • subscribers — one or more destination URLs per pipeline
  • jobs — every incoming webhook becomes a job with a tracked status
  • delivery_attempts — full audit log of every delivery attempt per subscriber

Tech Stack

Layer Technology
Runtime Node.js 20 + TypeScript
API Framework Express
Queue BullMQ (Redis-backed)
Database PostgreSQL 15
ORM Drizzle ORM
Containerization Docker + Docker Compose
CI/CD GitHub Actions
Validation Zod

Setup & Installation

Prerequisites

  • Docker and Docker Compose installed

Run the project

git clone https://github.com/taima4/Webhook-Driven-Task-Processing-Pipeline.git
cd Webhook-Driven-Task-Processing-Pipeline

docker compose up --build

This single command will:

  1. Start PostgreSQL and Redis
  2. Run database migrations automatically
  3. Start the API server on http://localhost:3000
  4. Start the background worker

Health Check

curl http://localhost:3000/health
# { "status": 200, "timestamp": "..." }

API Documentation

You can test the API endpoints using the interactive Postman collection below:

Run In Postman

or

Pipelines

Create a Pipeline

curl -X POST http://localhost:3000/pipelines \
  -H "Content-Type: application/json" \
  -d '{
    "name": "My First Pipeline",
    "actionType": "enrich",
    "actionConfig": {
      "addTimestamp": true,
      "addRequestId": true
    },
    "subscriberUrls": [
      "https://webhook.site/your-unique-url"
    ]
  }'

Response:

{
  "success": true,
  "pipeline": {
    "id": "uuid",
    "name": "My First Pipeline",
    "sourceToken": "abc123def456...",
    "actionType": "enrich",
    "subscribers": [{ "id": "uuid", "url": "https://..." }]
  }
}

Note: Save the sourceToken — you'll use it to send webhooks to this pipeline.


Get All Pipelines

curl "http://localhost:3000/pipelines?limit=10&offset=0"

Get Pipeline by ID

curl http://localhost:3000/pipelines/<pipeline-id>

Update a Pipeline

curl -X PATCH http://localhost:3000/pipelines/<pipeline-id> \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Updated Name",
    "subscriberUrls": ["https://new-subscriber.example.com"]
  }'

Delete a Pipeline

curl -X DELETE http://localhost:3000/pipelines/<pipeline-id>

Webhooks

Send a Webhook to a Pipeline

curl -X POST http://localhost:3000/webhooks/<sourceToken> \
  -H "Content-Type: application/json" \
  -d '{
    "user": "john",
    "email": "john@example.com",
    "password": "secret123"
  }'

Response:

{
  "success": true,
  "status": "accepted",
  "message": "Webhook received and queued",
  "jobId": "uuid"
}

The webhook is queued immediately and processed in the background.


Jobs

Get All Jobs

curl "http://localhost:3000/jobs?limit=10&offset=0"

Get Job by ID (with delivery history)

curl http://localhost:3000/jobs/<job-id>

Response includes job status + all delivery attempts:

{
  "success": true,
  "data": {
    "id": "uuid",
    "status": "completed",
    "payload": { "user": "john" },
    "result": { "user": "john", "_timestamp": "2025-01-01T00:00:00Z" },
    "deliveries": [
      {
        "subscriberUrl": "https://webhook.site/...",
        "success": true,
        "statusCode": 200,
        "attemptedAt": "2025-01-01T00:00:00Z"
      }
    ]
  }
}

Get Delivery Attempts for a Job

curl http://localhost:3000/jobs/<job-id>/deliveries

Processing Actions

Configure actionType and actionConfig when creating a pipeline:

validate — Ensure required fields exist

Throws an error if any required field is missing from the webhook payload.

{
  "actionType": "validate",
  "actionConfig": {
    "requiredFields": ["user", "email"]
  }
}

mask — Hide sensitive fields

Replaces the value of specified fields with "***MASKED***".

{
  "actionType": "mask",
  "actionConfig": {
    "fields": ["password", "creditCard"]
  }
}

Input: { "user": "john", "password": "secret" } Output: { "user": "john", "password": "***MASKED***" }


transform — Modify field values

Converts specified string fields to uppercase.

{
  "actionType": "transform",
  "actionConfig": {
    "uppercase": true,
    "fields": ["name", "city"]
  }
}

Input: { "name": "john", "city": "amman" } Output: { "name": "JOHN", "city": "AMMAN" }


enrich — Add metadata to the payload

Appends a timestamp and/or a unique request ID to the payload.

{
  "actionType": "enrich",
  "actionConfig": {
    "addTimestamp": true,
    "addRequestId": true
  }
}

Input: { "event": "user.signup" } Output: { "event": "user.signup", "_timestamp": "2025-01-01T00:00:00Z", "_requestId": "uuid" }


Design Decisions

Why BullMQ over a custom queue?

BullMQ is a battle-tested, Redis-backed queue with built-in support for exponential backoff, job deduplication, and concurrency control. Building this from scratch would introduce unnecessary risk and complexity. Using jobId as the BullMQ job ID also prevents duplicate processing if the same webhook is received twice.

Why Drizzle ORM over Prisma?

Drizzle is lightweight, has zero runtime overhead, and keeps the schema as plain TypeScript — making it easy to read and reason about. Prisma's abstraction felt like overkill for this project's scope.

Why a sourceToken instead of pipeline ID in the URL?

Exposing internal UUIDs in public-facing URLs is a security risk. A randomly generated sourceToken decouples the public webhook URL from the internal database ID, so even if a token is leaked, it reveals nothing about the system's internal structure.

Why a separate Worker process?

Processing webhooks synchronously would block the API and risk timeouts if a subscriber URL is slow to respond. The worker runs as a completely separate process — meaning the API stays fast regardless of processing load, and the worker can be scaled independently.

Retry Strategy

Delivery retries use BullMQ's built-in exponential backoff (delay: 1000ms, type: exponential). This avoids hammering a failing subscriber and gives it time to recover — a standard pattern in reliable event-driven systems.

Transactions for Pipeline Creation

Creating a pipeline involves inserting into both pipelines and subscribers. These are wrapped in a single database transaction to ensure atomicity — either both succeed or neither does, preventing orphaned subscriber records.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages