Skip to content

A7bari/user-analytics

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

User Analytics Pipeline (Spring Boot + Kafka + Kafka Connect + Elasticsearch)

An event processing pipeline that collects frontend analytics events, streams them through Kafka, aggregates sessions in real time, and ships data to Elasticsearch for search and dashboards. Includes:

  • Collector service (Spring Boot WebFlux + Kafka producer + Kafka Streams)
  • Kafka, Zookeeper, Kafka UI, Kafka Connect
  • Elasticsearch + Kibana
  • Client library and example site to capture and send events

Architecture Overview

Purpose

  • Collect user analytics (page views, custom events, errors, performance metrics) from web clients.
  • Stream events to Kafka for durable, scalable processing.
  • Aggregate session windows with Kafka Streams.
  • Sink raw and aggregated data into Elasticsearch for exploration and dashboards.

Event Flow

  1. Client site emits batched JSON events to the Collector.
  2. Collector enriches and produces to Kafka topic raw-events.
  3. Kafka Streams (in the same app) builds session aggregates and outputs to topic session-events.
  4. Kafka Connect ships both topics to Elasticsearch indices.
  5. Kibana provides dashboards and ad hoc queries.

Kafka Topics

  • raw-events: primary stream of JSON events from clients.
  • session-events: sessionized aggregates per visitor with inactivity-based windows.

Services

1) Collector Service (Spring Boot + WebFlux + Kafka + Streams)

Overview and Purpose

  • HTTP ingestion endpoint that accepts batched JSON analytics events.
  • Enriches events with server metadata (eventId, collectorIp, userAgent).
  • Produces events to Kafka (raw-events).
  • Kafka Streams topology builds inactivity-based sessions and writes session-events.

Main Endpoint

  • POST /collect
    • Consumes: application/json
    • Body: array of event objects
    • Returns: 202 Accepted on success, 400 for empty/invalid batch, 413 if too large

Core Kafka Interactions

  • Produces RawEvent to topic raw-events (3 partitions, auto-created).
  • Streams topology:
    • Groups by visitorId and applies SessionWindows.ofInactivityGapAndGrace(5 minutes gap, 60s grace).
    • Aggregates event IDs into RawSession.
    • Writes to topic session-events (3 partitions, auto-created).

Key Classes and Responsibilities

  • com.a7bari.eventCollector.controller.CollectorController
    • POST /collect, validates batch, enriches, sends to Kafka with concurrency (64).
  • com.a7bari.eventCollector.config.KafkaProducerConfig
    • Kafka producer factory and KafkaTemplate<String, RawEvent>.
    • Topic beans and Kafka Streams topology producing RawSession to session-events.
  • com.a7bari.eventCollector.model.RawEvent
    • DTO for incoming/outgoing events with validation constraints.
  • com.a7bari.eventCollector.model.RawSession
    • Aggregated session representation (sessionId, visitorId, startTs, events list).
  • Avro schema: src/main/avro/.../RawEvent.avsc
    • Provided for future/optional Avro usage; current pipeline uses JSON serialization.

Technologies and Dependencies

  • Spring Boot 3.4.x, Spring WebFlux
  • Spring for Apache Kafka (producer + streams), Apache Kafka Streams
  • JSON SerDe (current), Avro artifacts present but not enabled
  • Lombok, Bean Validation (Jakarta Validation)

Build and Run

  • Prereqs: Java 17, Maven, Docker
  • Start infra:
    • docker-compose up -d
      • Zookeeper (2181), Kafka (9092), Kafka UI (8085), Kafka Connect (8083), Elasticsearch (9200), Kibana (5601)
  • Start Collector:
    • cd event-collector
    • ./mvnw spring-boot:run
    • Default port intended: 8090
      • Note: application.properties currently shows a typo "server.port=809 0". Set to 8090 if needed.
  • Verify:
    • Kafka UI: http://localhost:8085 (topics raw-events, session-events)
    • POST sample:
      curl -i -X POST http://localhost:8090/collect \
        -H "Content-Type: application/json" \
        -d '[{
          "siteId":101,
          "clientTs":1730340000000,
          "visitorId":"anon_1234",
          "pageUrl":"http://localhost:5500/index.html",
          "actionName":"page_view"
        }]'

Notes

  • Producer uses JsonSerializer and acks=all with idempotence enabled.
  • Streams uses JSON serde explicitly for RawEvent and RawSession.
  • NewTopic beans auto-create topics with 3 partitions.

2) Kafka, Kafka UI, Kafka Connect

Overview and Purpose

  • Kafka broker and Zookeeper for local mode (KRaft migration optional).
  • Kafka UI to browse topics, partitions, consumer groups.
  • Kafka Connect to move data from Kafka to sinks (Elasticsearch).

Key Files

  • docker-compose.yml
    • Kafka (7.6.1), UI (8085), Connect (8083), Elasticsearch (8.8.0), Kibana (8.8.0).
  • connect_elastic.sh
    • Idempotent helper to create/update Elasticsearch sink connector via REST API.

Elasticsearch Sink Connector

  • Installed at container startup (confluentinc/kafka-connect-elasticsearch).
  • Default connector config (via script):
    • Topics: raw-events,session-events
    • connection.url: http://elasticsearch:9200
    • JSON converter (schemas disabled), schema.ignore=true, key.ignore=true

Create/Update Connector

# After docker-compose is up and Kafka Connect is healthy:
./connect_elastic.sh -u http://localhost:8083
# Options:
#   -t "raw-events,session-events"  to override topics
#   -e http://elasticsearch:9200    to override ES URL
#   -n my-es-sink                   to use custom connector name

3) Elasticsearch + Kibana

Overview and Purpose

  • Elasticsearch stores documents for raw events and session aggregates.
  • Kibana provides dashboards and exploration.

Access

Indices

  • With the default connector and topics, expect indices named by topic (e.g., raw-events, session-events).

4) Client Library and Example Site

Overview

  • A zero-dependency browser library (Client/assets/js/tiny-event-collector.js) that:
    • Batches events
    • Auto-captures page views, errors, and navigation performance metrics
    • Retries with backoff
    • Flushes with sendBeacon on pagehide/visibilitychange

Quick Start (Demo Site)

  • Serve the Client/ directory on http://localhost:5500 to match CORS:
    # Option A (Python)
    python3 -m http.server 5500 -d Client
    
    # Option B (Node "serve")
    npx serve Client -l 5500
  • Open http://localhost:5500 and interact. The app.js bootstraps the library and generates events.

Library Usage

<script src="/assets/js/tiny-event-collector.js"></script>
<script>
  TinyEventCollector.init({
    endpoint: 'http://localhost:8090/collect',
    siteId: 101,
    visitorId: 'anon_abcdef',    // generate per-user anon id
    recordingFlag: false,
    batchSize: 10,
    autoPageview: true,
    autoErrors: true,
    autoPerf: true
  });

  TinyEventCollector.trackEvent({
    category: 'catalog',
    action: 'click',
    label: 'open_product',
    value: 1,
    actionName: 'open_product'
  });
</script>

Data Model and Event Evolution

Conceptual evolution:

  • RawEvent (ingress) → ProcessedEvent (collector-enriched) → EnrichedEvent (sessionized aggregate)

In this project:

  • RawEvent (ingress and produced)

    • Client supplies: siteId, clientTs, visitorId, pageUrl, optional metadata (actionName, eventCategory, error payload, perf metrics).
    • Collector enriches server-side:
      • eventId (UUID)
      • collectorIp (from request)
      • userAgent (from headers)
    • This effective superset (RawEvent + server fields) functions as the "ProcessedEvent" state.
  • RawSession (EnrichedEvent)

    • Kafka Streams groups by visitorId with 5-minute inactivity windows and 60s grace.
    • Aggregates a list of event IDs and computes:
      • sessionId: visitorId + window.start
      • visitorId
      • startTs (epoch ms)
      • displayTime (human readable)
      • events: string[] of eventIds

Summary

  • raw-events topic contains JSON RawEvent including collector-enriched fields.
  • session-events topic contains JSON RawSession representing session-level aggregates.

How to Run (Local)

  1. Start Infrastructure
docker-compose up -d
# Kafka:   localhost:9092
# UI:      http://localhost:8085
# Connect: http://localhost:8083
# ES:      http://localhost:9200
# Kibana:  http://localhost:5601
  1. Start Collector
cd event-collector
# Ensure server.port=8090 in src/main/resources/application.properties
./mvnw spring-boot:run
# or build:
./mvnw -q clean package
java -jar target/event-collector-0.0.1-SNAPSHOT.jar
  1. Configure Elasticsearch Sink (Kafka Connect)
# Ships both raw-events and session-events to Elasticsearch
./connect_elastic.sh -u http://localhost:8083
  1. Run Client Example
# Serve the Client/ directory on 5500 (to match CORS in CollectorController)
python3 -m http.server 5500 -d Client
# Visit http://localhost:5500
  1. Validate
  • Kafka UI: check topics raw-events and session-events.
  • Elasticsearch:
    curl 'http://localhost:9200/raw-events/_search?pretty'
    curl 'http://localhost:9200/session-events/_search?pretty'
  • Kibana: create an index pattern for raw-events and session-events, build a dashboard.

Why It’s Impressive

  • Reactive ingestion (Spring WebFlux) handles large fan-in with efficient backpressure.
  • Exactly-once-like producer settings (acks=all, idempotence) for high durability.
  • Real-time sessionization with Kafka Streams using inactivity-based windows.
  • Pluggable sinks via Kafka Connect; Elasticsearch is a drop-in with a single script.
  • Operational UX with Kafka UI, Kibana, and idempotent connector management.
  • Client SDK is tiny, dependency-free, and production-friendly (batching, retries, sendBeacon).
  • Horizontal scalability at every tier (Collector replicas, Kafka partitions, Connect workers, ES nodes).

Notes and Known Gaps

  • Validation vs DTO: The endpoint currently accepts List while RawEvent.eventId is @NotNull but generated server-side. For strict validation, swap input type to RawEventDto for requests, then map to RawEvent internally before producing.
  • Avro schema provided but JSON serialization is used. Consider enabling Avro + Schema Registry for strong contracts.

Tech Stack Summary

Layer Technology
Ingestion API Spring Boot 3, WebFlux
Messaging Apache Kafka (Confluent images), Kafka UI
Stream Processing Kafka Streams (in-app topology)
Connectors Kafka Connect + Elasticsearch Sink
Storage/Analytics Elasticsearch + Kibana
Serialization JSON (current), Avro schema available (future)
Client SDK Vanilla JS (no deps), Fetch + sendBeacon
Build Maven, Java 17
Infra (local) Docker Compose

Repository Structure

  • event-collector/
    • src/main/java/.../controller/CollectorController.java
    • src/main/java/.../config/KafkaProducerConfig.java
    • src/main/java/.../model/{RawEvent,RawEventDto,RawSession}.java
    • src/main/avro/.../RawEvent.avsc
    • src/main/resources/application.properties
    • pom.xml
  • Client/
    • assets/js/tiny-event-collector.js (library)
    • assets/js/app.js (example site wiring)
    • index.html, catalog.html, product.html, about.html
  • docker-compose.yml
  • connect_elastic.sh

About

An event processing pipeline that collects frontend analytics events, streams them through Kafka, aggregates sessions in real time, and ships data to Elasticsearch for search and dashboards.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors