Skip to content

volga-project/volga

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

641 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Volga - Real-time data processing for modern AI/ML systems

GitHub License Static Badge GitHub Created At GitHub last commit GitHub top language GitHub Issues or Pull Requests

Volga is a data processing engine aiming to be an alternative to general streaming engines like Flink and Spark as well as AI/ML oriented systems like Chronon and OpenMLDB. It is built in Rust using Apache DataFusion and Apache Arrow.

🀯 What and why

Modern AI/ML systems (recsys, fraud detection, personalization, search, RAG) rely on features computed from real-time/historical event streams:

  • Long continiously sliding window aggregations (1h, 7d, 30d, 1y)
  • Point-in-time correct data for batch and inference
  • Consistent online + offline pipeline definitions and computation semantics
  • Real-time asynchronous data serving

Today this typically requires combining multiple systems:

  • Streaming engine (Flink/Spark)
  • Storage (Redis/S3)
  • Serving layer
  • Feature platforms (Chronon, OpenMLDB, etc.)

Volga replaces this with a single engine:

  • Streaming = write path
  • Request = read path
  • Batch = training

Check the blog, join Slack.

πŸ“Š System Comparison

System Execution Model Storage/State Pipeline Definitions Window Agg Optimization Serving
Volga native (streaming / batch / request); Rust native remote (S3 + SlateDB) + local cache SQL + ML funcs (top, _cate, etc.) βœ… tiling (native in-state, incremental) native request-mode + queryable state
Flink / Spark engines (streaming / batch); Java local / checkpointed SQL ❌ none (recompute-heavy) external serving layer required
Chronon composition (Flink + Spark + Redis); Java/Scala local + external KV store DSL (feature pipelines) βœ… tiling (via external materialization) external serving layer
OpenMLDB DB + Spark (no native streaming engine); C++ local (memory + disk) SQL + ML funcs ⚠️ limited (no tiling) built-in DB serving

🌳 Features

Unified execution

  • Streaming, Batch, Request modes via single SQL query
  • Same operators and semantics

SQL + ML-oriented aggregations

  • top(col, k)
  • topn_frequency(col, k)
  • top1_ratio(col)
  • sum_cate, count_cate, avg_cate
  • _where, _cate_where

Long windows via tiling

  • Efficient long windows (weeks/months)
  • Incremental tile-based aggregation in-state
  • No recomputation

Request-time execution

  • Dataflow graph optimization to serve poin-in-time correct data
  • Queryable state allows serving data directly
  • No Redis / serving layer
  • Built-in read/write compute separation

Remote state storage

  • Object storage-backed (SlateDB + S3, etc.)
  • Partial state loading
  • Cheap checkpointing

High performance

  • Rust + Tokio
  • Apache Arrow
  • Apache DataFusion

🏠 Architecture

Volga is a distributed dataflow engine:

  • SQL β†’ dataflow graph (via Apache DataFusion)
  • Operators executed across workers
  • State stored remotely (object storage via SlateDB)
  • Columnar execution (Apache Arrow)

Core principles:

  • Consistent streaming, batch and request execution modes
  • SQL first - consistent pipeline definitions across all execution modes
  • Compute–storage separation - Remote State Storage
  • Read Path-Write Path compute separation - Optimal perf for real-time pipelines
  • Queryable state - No external KV stores
  • Request-mode execution - Serve same query as batch without external seving layer
  • Window Agg Optimizations (Tiling)

πŸš… Example SQL Pipeline

SELECT
  u.user_id,

  -- core aggregations
  count(*) OVER w_short AS purchases_1h,
  sum(o.amount) OVER w_long AS spent_30d,

  -- top aggregations
  top(o.product_id, 3) OVER w_long AS top_products_30d,
  topn_frequency(o.product_id, 3) OVER w_long AS top_product_freq_30d,
  top1_ratio(o.product_id) OVER w_rows AS top_product_dominance_last_100,

  -- categorical aggregation
  sum_cate(o.amount, o.product_type) OVER w_long AS spent_per_category,

  -- conditional categorical aggregation
  count_cate_where(o.product_id, o.product_type, o.product_type = 'ON_SALE') OVER w_short AS on_sale_count_1h

WINDOW
  w_short AS (
    PARTITION BY u.user_id 
    ORDER BY o.event_time 
    RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW 
  ),
  w_long AS (
    PARTITION BY u.user_id 
    ORDER BY o.event_time 
    RANGE BETWEEN INTERVAL '30 day' PRECEDING AND CURRENT ROW 
  ),
  w_rows AS (
    PARTITION BY u.user_id 
    ORDER BY o.event_time 
    ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
  )

FROM users u
JOIN orders o
  ON u.user_id = o.buyer_id

🧩 Running a Pipeline

Python (JSON spec + client)

import json
from volga.client import Client

pipeline_spec = {
    "name": "user_features",
    "execution_mode": "request",
    "query": """
        SELECT
          user_id,
          count(*) OVER w AS purchases_1h,
          sum(amount) OVER w AS spent_7d,
          topn_frequency(category, 3) OVER w
        WINDOW w as PARTITION BY user_id ORDER BY ts RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW 
        FROM events
    """,
    "sources": [
        {
            "name": "events",
            "type": "kafka",
            "config": {"topic": "events"}
        }
    ]
}

client = Client()
client.create_pipeline(json.dumps(pipeline_spec))
client.start_pipeline("user_features")

Rust (embedded)

use volga::api::spec::pipeline::PipelineSpec;
use volga::api::pipeline_context::PipelineContext;

let spec = PipelineSpec {
    name: "user_features".to_string(),
    execution_mode: "batch".to_string(),
    query: r#"
        SELECT
          user_id,
          count(*) OVER w AS purchases_1h,
          sum(amount) OVER w AS spent_7d,
          topn_frequency(category, 3) OVER w
        WINDOW w as PARTITION BY user_id ORDER BY ts RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND CURRENT ROW
        FROM events
    "#.to_string(),
    sources: vec![],
    ..Default::default()
};

let ctx = PipelineContext::try_from(spec)?;
ctx.execute()?;

🚒 Installation

git clone https://github.com/volga-project/volga
cd volga
cargo build

πŸ™‡ Running Locally

cargo test

🧁 Roadmap

  • Remote state (production-ready)
  • Batch execution mode
  • Join operator
  • Kubernetes deployment
  • Python Client
  • UI / dashboard

Releases

No releases published

Packages

 
 
 

Contributors

Languages