An event processing pipeline that collects frontend analytics events, streams them through Kafka, aggregates sessions in real time, and ships data to Elasticsearch for search and dashboards. Includes:
- Collector service (Spring Boot WebFlux + Kafka producer + Kafka Streams)
- Kafka, Zookeeper, Kafka UI, Kafka Connect
- Elasticsearch + Kibana
- Client library and example site to capture and send events
Purpose
- Collect user analytics (page views, custom events, errors, performance metrics) from web clients.
- Stream events to Kafka for durable, scalable processing.
- Aggregate session windows with Kafka Streams.
- Sink raw and aggregated data into Elasticsearch for exploration and dashboards.
Event Flow
- Client site emits batched JSON events to the Collector.
- Collector enriches and produces to Kafka topic raw-events.
- Kafka Streams (in the same app) builds session aggregates and outputs to topic session-events.
- Kafka Connect ships both topics to Elasticsearch indices.
- Kibana provides dashboards and ad hoc queries.
Kafka Topics
- raw-events: primary stream of JSON events from clients.
- session-events: sessionized aggregates per visitor with inactivity-based windows.
Overview and Purpose
- HTTP ingestion endpoint that accepts batched JSON analytics events.
- Enriches events with server metadata (eventId, collectorIp, userAgent).
- Produces events to Kafka (raw-events).
- Kafka Streams topology builds inactivity-based sessions and writes session-events.
Main Endpoint
- POST /collect
- Consumes: application/json
- Body: array of event objects
- Returns: 202 Accepted on success, 400 for empty/invalid batch, 413 if too large
Core Kafka Interactions
- Produces RawEvent to topic raw-events (3 partitions, auto-created).
- Streams topology:
- Groups by visitorId and applies SessionWindows.ofInactivityGapAndGrace(5 minutes gap, 60s grace).
- Aggregates event IDs into RawSession.
- Writes to topic session-events (3 partitions, auto-created).
Key Classes and Responsibilities
- com.a7bari.eventCollector.controller.CollectorController
- POST /collect, validates batch, enriches, sends to Kafka with concurrency (64).
- com.a7bari.eventCollector.config.KafkaProducerConfig
- Kafka producer factory and KafkaTemplate<String, RawEvent>.
- Topic beans and Kafka Streams topology producing RawSession to session-events.
- com.a7bari.eventCollector.model.RawEvent
- DTO for incoming/outgoing events with validation constraints.
- com.a7bari.eventCollector.model.RawSession
- Aggregated session representation (sessionId, visitorId, startTs, events list).
- Avro schema: src/main/avro/.../RawEvent.avsc
- Provided for future/optional Avro usage; current pipeline uses JSON serialization.
Technologies and Dependencies
- Spring Boot 3.4.x, Spring WebFlux
- Spring for Apache Kafka (producer + streams), Apache Kafka Streams
- JSON SerDe (current), Avro artifacts present but not enabled
- Lombok, Bean Validation (Jakarta Validation)
Build and Run
- Prereqs: Java 17, Maven, Docker
- Start infra:
- docker-compose up -d
- Zookeeper (2181), Kafka (9092), Kafka UI (8085), Kafka Connect (8083), Elasticsearch (9200), Kibana (5601)
- docker-compose up -d
- Start Collector:
- cd event-collector
- ./mvnw spring-boot:run
- Default port intended: 8090
- Note: application.properties currently shows a typo "server.port=809 0". Set to 8090 if needed.
- Verify:
- Kafka UI: http://localhost:8085 (topics raw-events, session-events)
- POST sample:
curl -i -X POST http://localhost:8090/collect \ -H "Content-Type: application/json" \ -d '[{ "siteId":101, "clientTs":1730340000000, "visitorId":"anon_1234", "pageUrl":"http://localhost:5500/index.html", "actionName":"page_view" }]'
Notes
- Producer uses JsonSerializer and acks=all with idempotence enabled.
- Streams uses JSON serde explicitly for RawEvent and RawSession.
- NewTopic beans auto-create topics with 3 partitions.
Overview and Purpose
- Kafka broker and Zookeeper for local mode (KRaft migration optional).
- Kafka UI to browse topics, partitions, consumer groups.
- Kafka Connect to move data from Kafka to sinks (Elasticsearch).
Key Files
- docker-compose.yml
- Kafka (7.6.1), UI (8085), Connect (8083), Elasticsearch (8.8.0), Kibana (8.8.0).
- connect_elastic.sh
- Idempotent helper to create/update Elasticsearch sink connector via REST API.
Elasticsearch Sink Connector
- Installed at container startup (confluentinc/kafka-connect-elasticsearch).
- Default connector config (via script):
- Topics: raw-events,session-events
- connection.url: http://elasticsearch:9200
- JSON converter (schemas disabled), schema.ignore=true, key.ignore=true
Create/Update Connector
# After docker-compose is up and Kafka Connect is healthy:
./connect_elastic.sh -u http://localhost:8083
# Options:
# -t "raw-events,session-events" to override topics
# -e http://elasticsearch:9200 to override ES URL
# -n my-es-sink to use custom connector nameOverview and Purpose
- Elasticsearch stores documents for raw events and session aggregates.
- Kibana provides dashboards and exploration.
Access
- Elasticsearch: http://localhost:9200
- Kibana: http://localhost:5601
Indices
- With the default connector and topics, expect indices named by topic (e.g., raw-events, session-events).
Overview
- A zero-dependency browser library (Client/assets/js/tiny-event-collector.js) that:
- Batches events
- Auto-captures page views, errors, and navigation performance metrics
- Retries with backoff
- Flushes with sendBeacon on pagehide/visibilitychange
Quick Start (Demo Site)
- Serve the Client/ directory on http://localhost:5500 to match CORS:
# Option A (Python) python3 -m http.server 5500 -d Client # Option B (Node "serve") npx serve Client -l 5500
- Open http://localhost:5500 and interact. The app.js bootstraps the library and generates events.
Library Usage
<script src="/assets/js/tiny-event-collector.js"></script>
<script>
TinyEventCollector.init({
endpoint: 'http://localhost:8090/collect',
siteId: 101,
visitorId: 'anon_abcdef', // generate per-user anon id
recordingFlag: false,
batchSize: 10,
autoPageview: true,
autoErrors: true,
autoPerf: true
});
TinyEventCollector.trackEvent({
category: 'catalog',
action: 'click',
label: 'open_product',
value: 1,
actionName: 'open_product'
});
</script>Conceptual evolution:
- RawEvent (ingress) → ProcessedEvent (collector-enriched) → EnrichedEvent (sessionized aggregate)
In this project:
-
RawEvent (ingress and produced)
- Client supplies: siteId, clientTs, visitorId, pageUrl, optional metadata (actionName, eventCategory, error payload, perf metrics).
- Collector enriches server-side:
- eventId (UUID)
- collectorIp (from request)
- userAgent (from headers)
- This effective superset (RawEvent + server fields) functions as the "ProcessedEvent" state.
-
RawSession (EnrichedEvent)
- Kafka Streams groups by visitorId with 5-minute inactivity windows and 60s grace.
- Aggregates a list of event IDs and computes:
- sessionId: visitorId + window.start
- visitorId
- startTs (epoch ms)
- displayTime (human readable)
- events: string[] of eventIds
Summary
- raw-events topic contains JSON RawEvent including collector-enriched fields.
- session-events topic contains JSON RawSession representing session-level aggregates.
- Start Infrastructure
docker-compose up -d
# Kafka: localhost:9092
# UI: http://localhost:8085
# Connect: http://localhost:8083
# ES: http://localhost:9200
# Kibana: http://localhost:5601- Start Collector
cd event-collector
# Ensure server.port=8090 in src/main/resources/application.properties
./mvnw spring-boot:run
# or build:
./mvnw -q clean package
java -jar target/event-collector-0.0.1-SNAPSHOT.jar- Configure Elasticsearch Sink (Kafka Connect)
# Ships both raw-events and session-events to Elasticsearch
./connect_elastic.sh -u http://localhost:8083- Run Client Example
# Serve the Client/ directory on 5500 (to match CORS in CollectorController)
python3 -m http.server 5500 -d Client
# Visit http://localhost:5500- Validate
- Kafka UI: check topics raw-events and session-events.
- Elasticsearch:
curl 'http://localhost:9200/raw-events/_search?pretty' curl 'http://localhost:9200/session-events/_search?pretty'
- Kibana: create an index pattern for raw-events and session-events, build a dashboard.
- Reactive ingestion (Spring WebFlux) handles large fan-in with efficient backpressure.
- Exactly-once-like producer settings (acks=all, idempotence) for high durability.
- Real-time sessionization with Kafka Streams using inactivity-based windows.
- Pluggable sinks via Kafka Connect; Elasticsearch is a drop-in with a single script.
- Operational UX with Kafka UI, Kibana, and idempotent connector management.
- Client SDK is tiny, dependency-free, and production-friendly (batching, retries, sendBeacon).
- Horizontal scalability at every tier (Collector replicas, Kafka partitions, Connect workers, ES nodes).
- Validation vs DTO: The endpoint currently accepts List while RawEvent.eventId is @NotNull but generated server-side. For strict validation, swap input type to RawEventDto for requests, then map to RawEvent internally before producing.
- Avro schema provided but JSON serialization is used. Consider enabling Avro + Schema Registry for strong contracts.
| Layer | Technology |
|---|---|
| Ingestion API | Spring Boot 3, WebFlux |
| Messaging | Apache Kafka (Confluent images), Kafka UI |
| Stream Processing | Kafka Streams (in-app topology) |
| Connectors | Kafka Connect + Elasticsearch Sink |
| Storage/Analytics | Elasticsearch + Kibana |
| Serialization | JSON (current), Avro schema available (future) |
| Client SDK | Vanilla JS (no deps), Fetch + sendBeacon |
| Build | Maven, Java 17 |
| Infra (local) | Docker Compose |
- event-collector/
- src/main/java/.../controller/CollectorController.java
- src/main/java/.../config/KafkaProducerConfig.java
- src/main/java/.../model/{RawEvent,RawEventDto,RawSession}.java
- src/main/avro/.../RawEvent.avsc
- src/main/resources/application.properties
- pom.xml
- Client/
- assets/js/tiny-event-collector.js (library)
- assets/js/app.js (example site wiring)
- index.html, catalog.html, product.html, about.html
- docker-compose.yml
- connect_elastic.sh