Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
"pages": [
"geneva/jobs/contexts",
"geneva/jobs/backfilling",
"geneva/jobs/bulk-load-columns",
"geneva/jobs/materialized-views",
"geneva/jobs/lifecycle",
"geneva/jobs/conflicts",
Expand Down
207 changes: 207 additions & 0 deletions docs/geneva/jobs/bulk-load-columns.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
---
title: Bulk Loading & Updating Columns
sidebarTitle: Bulk Load / Update Columns
description: Load or update column data from external sources (Parquet, Lance, IPC) into your LanceDB table using a primary-key join.
icon: columns-3
---

<Badge>Introduced in Geneva 0.13.0</Badge>

## Overview

A common scenario is having column data that **already exists** in an external dataset — embeddings from a vendor, features exported from a data warehouse, or columnar data in cloud storage — that you want to load into an existing LanceDB table.

`load_columns` joins value columns from an external source into your table by primary key. It works for both use cases:

- **Adding new columns:** If the specified columns don't exist in the destination table, they are created automatically.
- **Updating existing columns:** If the columns already exist, matched rows are updated with the source values. Unmatched rows are controlled by the `on_missing` parameter.

**Destination table (before):**

| pk | col_a | col_b |
|----|-------|-------|
| 1 | x | 10 |
| 2 | y | 20 |
| 3 | z | 30 |

**External source (Parquet / Lance / IPC):**

| pk | embedding |
|----|------------|
| 1 | [.1, .2] |
| 2 | [.3, .4] |
| 3 | [.5, .6] |

**Destination table (after `load_columns` join on `pk`):**

| pk | col_a | col_b | embedding |
|----|-------|-------|-----------|
| 1 | x | 10 | [.1, .2] |
| 2 | y | 20 | [.3, .4] |
| 3 | z | 30 | [.5, .6] |

### When to use

- **Loading new columns:** Attach pre-computed embeddings from a vendor, or add features exported from Spark/BigQuery as Parquet.
- **Updating existing columns:** Replace outdated embeddings with a newer model's output, or refresh feature values from an updated export.
- **Partial updates:** Update a subset of rows (e.g., only rows whose embeddings were recomputed) while preserving all other values via carry semantics.
- **Format consolidation:** Merge columnar data spread across Parquet files into an existing Lance table.

## Basic usage

Supports Parquet, Lance, and IPC sources. The format is auto-detected from the URI suffix, or can be overridden with `source_format`. You can load one or more columns in a single call.

<CodeGroup>
```python Python icon="python"
import lancedb

db = lancedb.connect("my_db")
table = db.open_table("my_table")

# Add a new embedding column from a Parquet source
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
)

# Later, update the same column with refreshed embeddings
table.load_columns(
source="s3://bucket/embeddings_v2/",
pk="document_id",
columns=["embedding"],
)
```
</CodeGroup>

For non-blocking execution, use `load_columns_async` which returns a `JobFuture` — call `.result()` to block until completion.

## Handling missing keys

When the source doesn't cover every row in the destination, the `on_missing` parameter controls what happens to unmatched rows:

| Mode | Behavior |
|------|----------|
| `"carry"` (default) | Keep existing value. NULL if the column is new. |
| `"null"` | Explicitly set to NULL. |
| `"error"` | Raise an error on the first unmatched row. |

<CodeGroup>
```python Python icon="python"
# Default: unmatched rows keep their current value
table.load_columns(
source="s3://bucket/partial_embeddings/",
pk="document_id",
columns=["embedding"],
on_missing="carry",
)

# Strict mode: fail if source doesn't cover all rows
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
on_missing="error",
)
```
</CodeGroup>

The `carry` mode is particularly important for partial and multi-pass loads — it ensures that previously loaded values are never overwritten.

## Performance tuning

### Concurrency

The `concurrency` parameter controls the number of worker processes. The default is 8 — set this to match your available cluster resources.

<CodeGroup>
```python Python icon="python"
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
concurrency=16,
)
```
</CodeGroup>

### Checkpointing

Bulk load jobs checkpoint each batch for fault tolerance, using the same infrastructure as [backfill jobs](/geneva/jobs/backfilling#adaptive-checkpoint-sizing). Key parameters:

- `checkpoint_interval_seconds`: Target seconds per checkpoint batch (default 60s). The adaptive sizer grows or shrinks batch sizes to hit this target.
- `min_checkpoint_size` / `max_checkpoint_size`: Bounds for adaptive sizing.

<Tip>
If your job is small enough to complete without needing fault tolerance, you can get better performance by effectively disabling checkpoints. Increase `checkpoint_interval_seconds` to a large value and set `min_checkpoint_size` high enough that each worker processes its entire workload in a single batch.
</Tip>

### Commit visibility

For long-running jobs, `commit_granularity` controls how many fragments complete before an intermediate commit makes partial results visible to readers.

<CodeGroup>
```python Python icon="python"
table.load_columns(
source="s3://bucket/embeddings/",
pk="document_id",
columns=["embedding"],
commit_granularity=10,
)
```
</CodeGroup>

### Multi-pass loads for large sources

`load_columns` builds an in-memory primary-key index from the source. If the source is too large to fit in memory, split it into chunks and run sequential calls. Carry semantics guarantee correctness across passes.

Index memory depends on primary key type:

| PK type | ~Memory per row | 100M rows | 1B rows |
|---------|----------------|-----------|---------|
| int64 | ~8 bytes | ~800 MB | ~8 GB |
| string (avg 32 bytes) | ~32 bytes | ~3.2 GB | ~32 GB |

Choose N so that `source_size / N` fits in memory:

<CodeGroup>
```python Python icon="python"
import pyarrow.dataset as pads

# Discover source files (metadata only, no data I/O)
source_files = pads.dataset("s3://bucket/embeddings/", format="parquet").files

# Split into N chunks and run sequentially
N = 4
total = len(source_files)
for i in range(N):
chunk = source_files[i * total // N : (i + 1) * total // N]
table.load_columns(
source=chunk,
pk="document_id",
columns=["embedding"],
)
```
</CodeGroup>

Each pass reads only its assigned files, so total source I/O stays at 1x. If the source is already partitioned into subdirectories, pass each URI directly:

<CodeGroup>
```python Python icon="python"
for shard in range(4):
table.load_columns(
source=f"s3://bucket/embeddings/shard_{shard}/",
pk="document_id",
columns=["embedding"],
)
```
</CodeGroup>

<Warning>
Multi-pass loads must run **sequentially**, not concurrently. Two `load_columns` calls running at the same time against the same column produce an interleaved end state. Use a plain `for` loop, not `concurrent.futures`.
</Warning>

## Reference

* [`load_columns` API](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.load_columns)
* [`load_columns_async` API](https://lancedb.github.io/geneva/api/table/#geneva.table.Table.load_columns_async)
Loading