Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 125 additions & 45 deletions src/pipeline/pass_parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ enum {
#define PP_FIELD_HINT_CONF 0.85
enum { PP_CSHARP_M_PREFIX_LEN = 2 };

/* Source-retention caps for the parallel pipeline. The extract worker
* copies source bytes into result->arena so the fused cross-file LSP
* step in resolve_worker can run without re-reading from disk. Bound
* peak RSS with a per-file cap (skip retention for pathological huge
* generated files) and a total project-wide cap (skip when budget
* exhausted — cross-file LSP becomes a no-op for those late files,
* defs/calls already extracted are unaffected). */
#define PP_RETAIN_PER_FILE_MAX_BYTES (100LL * 1024 * 1024)
#define PP_RETAIN_TOTAL_BUDGET_BYTES (2LL * 1024 * 1024 * 1024)
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_internal.h"
#include "pipeline/pass_lsp_cross.h" /* cbm_pxc_* helpers for fused cross-file LSP */
Expand Down Expand Up @@ -78,6 +69,63 @@ enum { PP_CSHARP_M_PREFIX_LEN = 2 };
#include <string.h>
#include <time.h>

static size_t cbm_parallel_extract_default_total_cap(void) {
const size_t hard_max = 2ULL * 1024 * 1024 * 1024;
size_t cap = cbm_mem_budget();
if (cap > 0) {
cap /= 4;
} else {
cap = hard_max;
}
if (cap > hard_max) {
cap = hard_max;
}
return cap;
}

static size_t cbm_parallel_extract_default_per_file_cap(size_t total_cap) {
const size_t hard_max = 100ULL * 1024 * 1024;
size_t cap = hard_max;
if (cap > total_cap) {
cap = total_cap;
}
return cap;
}

static cbm_parallel_extract_opts_t cbm_parallel_extract_resolve_opts(
const cbm_parallel_extract_opts_t *opts) {
const size_t hard_total_max = 2ULL * 1024 * 1024 * 1024;
const size_t hard_per_file_max = 100ULL * 1024 * 1024;
cbm_parallel_extract_opts_t resolved = {
.retain_sources = true,
.retain_total_budget_bytes = cbm_parallel_extract_default_total_cap(),
.retain_per_file_max_bytes = 0,
};
resolved.retain_per_file_max_bytes =
cbm_parallel_extract_default_per_file_cap(resolved.retain_total_budget_bytes);
if (opts) {
if (opts->retain_sources_set) {
resolved.retain_sources = opts->retain_sources;
}
if (opts->retain_total_budget_bytes > 0) {
resolved.retain_total_budget_bytes = opts->retain_total_budget_bytes;
}
if (opts->retain_per_file_max_bytes > 0) {
resolved.retain_per_file_max_bytes = opts->retain_per_file_max_bytes;
}
}
if (resolved.retain_total_budget_bytes > hard_total_max) {
resolved.retain_total_budget_bytes = hard_total_max;
}
if (resolved.retain_per_file_max_bytes > hard_per_file_max) {
resolved.retain_per_file_max_bytes = hard_per_file_max;
}
if (resolved.retain_per_file_max_bytes > resolved.retain_total_budget_bytes) {
resolved.retain_per_file_max_bytes = resolved.retain_total_budget_bytes;
}
return resolved;
}

static uint64_t extract_now_ns(void) {
struct timespec ts;
cbm_clock_gettime(CLOCK_MONOTONIC, &ts);
Expand Down Expand Up @@ -494,6 +542,10 @@ typedef struct {
_Atomic int next_file_idx;

cbm_pkg_entries_t *pkg_entries; /* per-worker manifest arrays (separate allocation) */
bool retain_sources;
size_t retain_total_budget_bytes;
size_t retain_per_file_max_bytes;

_Atomic int64_t retained_bytes; /* total source bytes copied into result arenas */
} extract_ctx_t;

Expand Down Expand Up @@ -631,16 +683,14 @@ static void extract_worker(int worker_id, void *ctx_ptr) {
source_len, &ec->pkg_entries[worker_id]);
}

/* Retain source bytes in result->arena so the fused cross-file
* LSP step in resolve_worker can run without re-reading from
* disk. Capped per-file (PP_RETAIN_PER_FILE_MAX_BYTES) and
* globally (PP_RETAIN_TOTAL_BUDGET_BYTES) to bound peak RSS.
* Skipping retention just means cross-file LSP no-ops for this
* file — defs/calls already extracted are unaffected. */
if (source_len > 0 && (int64_t)source_len <= PP_RETAIN_PER_FILE_MAX_BYTES) {
/* Retain source bytes only when the resolved retention options allow it.
* Skipping retention means the fused cross-file LSP step will no-op for
* this file — defs/calls already extracted are unaffected. */
if (ec->retain_sources && source_len > 0 &&
(size_t)source_len <= ec->retain_per_file_max_bytes) {
int64_t prior = atomic_fetch_add_explicit(&ec->retained_bytes, (int64_t)source_len,
memory_order_relaxed);
if (prior + (int64_t)source_len <= PP_RETAIN_TOTAL_BUDGET_BYTES) {
if ((size_t)(prior + (int64_t)source_len) <= ec->retain_total_budget_bytes) {
char *copy = (char *)cbm_arena_alloc(&result->arena, (size_t)source_len + 1);
if (copy) {
memcpy(copy, source, (size_t)source_len);
Expand Down Expand Up @@ -719,15 +769,24 @@ static void log_extract_mem_stats(int worker_count) {
}
}

int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count,
CBMFileResult **result_cache, _Atomic int64_t *shared_ids,
int worker_count) {
int cbm_parallel_extract_ex(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count,
CBMFileResult **result_cache, _Atomic int64_t *shared_ids,
int worker_count, const cbm_parallel_extract_opts_t *opts) {
cbm_parallel_extract_opts_t resolved_opts = cbm_parallel_extract_resolve_opts(opts);

if (file_count == 0) {
return 0;
}

cbm_log_info("parallel.extract.start", "files", itoa_log(file_count), "workers",
itoa_log(worker_count));
{
size_t mb = (size_t)CBM_SZ_1K * CBM_SZ_1K;
cbm_log_info("parallel.extract.retention", "retain_sources",
resolved_opts.retain_sources ? "true" : "false", "total_mb",
itoa_log((int)(resolved_opts.retain_total_budget_bytes / mb)), "per_file_mb",
itoa_log((int)(resolved_opts.retain_per_file_max_bytes / mb)));
}

/* Log per-worker memory budget */
if (cbm_mem_budget() > 0) {
Expand All @@ -748,7 +807,10 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files,

/* Sub-phase: Sort files by descending size for tail-latency reduction */
CBM_PROF_START(t_sort);
file_sort_entry_t *sorted = malloc(file_count * sizeof(file_sort_entry_t));
file_sort_entry_t *sorted = malloc((size_t)file_count * sizeof(file_sort_entry_t));
if (!sorted) {
return CBM_NOT_FOUND;
}
for (int i = 0; i < file_count; i++) {
sorted[i].idx = i;
sorted[i].size = files[i].size;
Expand All @@ -766,8 +828,12 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files,
memset(workers, 0, (size_t)worker_count * sizeof(extract_worker_state_t));

/* Per-worker manifest entry arrays (separate from cache-line-aligned worker state) */
cbm_pkg_entries_t *pkg_entries = calloc(worker_count, sizeof(cbm_pkg_entries_t));

cbm_pkg_entries_t *pkg_entries = calloc((size_t)worker_count, sizeof(cbm_pkg_entries_t));
if (!pkg_entries) {
cbm_aligned_free(workers);
free(sorted);
return CBM_NOT_FOUND;
}
extract_ctx_t ec = {
.files = files,
.sorted = sorted,
Expand All @@ -780,15 +846,18 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files,
.shared_ids = shared_ids,
.cancelled = ctx->cancelled,
.pkg_entries = pkg_entries,
.retain_sources = resolved_opts.retain_sources,
.retain_total_budget_bytes = resolved_opts.retain_total_budget_bytes,
.retain_per_file_max_bytes = resolved_opts.retain_per_file_max_bytes,
};
atomic_init(&ec.next_worker_id, 0);
atomic_init(&ec.next_file_idx, 0);
atomic_init(&ec.retained_bytes, 0);

/* Sub-phase: Dispatch workers (parse + extract per file, PARALLEL) */
CBM_PROF_START(t_dispatch);
cbm_parallel_for_opts_t opts = {.max_workers = worker_count, .force_pthreads = false};
cbm_parallel_for(worker_count, extract_worker, &ec, opts);
cbm_parallel_for_opts_t parallel_opts = {.max_workers = worker_count, .force_pthreads = false};
cbm_parallel_for(worker_count, extract_worker, &ec, parallel_opts);
CBM_PROF_END_N("parallel_extract", "3_dispatch_workers_parallel", t_dispatch, file_count);

/* Sub-phase: Merge all local gbufs into main gbuf (SEQUENTIAL, gbuf not thread-safe) */
Expand Down Expand Up @@ -821,7 +890,12 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files,
return 0;
}

/* ── Phase 3B: Serial Registry Build ─────────────────────────────── */
int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count,
CBMFileResult **result_cache, _Atomic int64_t *shared_ids,
int worker_count) {
return cbm_parallel_extract_ex(ctx, files, file_count, result_cache, shared_ids, worker_count,
NULL);
}

/* Register one definition and create DEFINES + DEFINES_METHOD edges. Returns edge count. */
static int register_and_link_def(cbm_pipeline_ctx_t *ctx, const CBMDefinition *def, const char *rel,
Expand Down Expand Up @@ -2270,18 +2344,25 @@ static void resolve_worker(int worker_id, void *ctx_ptr) {
* Runs BEFORE resolve_file_calls so its additions to
* result->resolved_calls are picked up by
* cbm_pipeline_find_lsp_resolution when calls become CALLS
* edges. Requires result->source to have been retained in
* result->arena during extract (PP_RETAIN_*); files over the
* cap or past the budget have result->source==NULL and are
* counted as skipped_no_source — defs/calls already in the
* extract are unaffected.
* edges. Prefer source bytes retained in result->arena; if
* the low-RAM retention cap skipped this file, fall back to a
* bounded per-file read and free it immediately after the LSP
* call. This keeps project-wide retention bounded without
* dropping cross-file LSP correctness for unretained files.
*
* Slab reclaim afterward: the LSP re-parses via tree-sitter,
* which allocates through this worker's TLS slab. Reclaiming
* here keeps the slab high-water bounded as the resolve phase
* walks across thousands of files in a single worker thread. */
if (cross_lsp_eligible) {
if (result->source && result->source_len > 0) {
char *lsp_source_owned = NULL;
const char *lsp_source = result->source;
int lsp_source_len = result->source_len;
if ((!lsp_source || lsp_source_len <= 0) && rc->files[file_idx].path) {
lsp_source_owned = read_file(rc->files[file_idx].path, &lsp_source_len);
lsp_source = lsp_source_owned;
}
if (lsp_source && lsp_source_len > 0) {
const char *def_module = rc->def_modules ? rc->def_modules[file_idx] : module_qn;

uint64_t lsp_t0 = extract_now_ns();
Expand Down Expand Up @@ -2315,25 +2396,24 @@ static void resolve_worker(int worker_id, void *ctx_ptr) {
}
case CBM_LANG_PYTHON:
cbm_run_py_lsp_cross_with_registry(
&result->arena, result->source, result->source_len, def_module,
prebuilt, imp_keys, imp_vals, imp_count, result->cached_tree,
&result->arena, lsp_source, lsp_source_len, def_module, prebuilt,
imp_keys, imp_vals, imp_count, result->cached_tree,
&result->resolved_calls);
used_prebuilt = true;
break;
case CBM_LANG_C:
case CBM_LANG_CPP:
case CBM_LANG_CUDA:
cbm_run_c_lsp_cross_with_registry(
&result->arena, result->source, result->source_len, def_module,
&result->arena, lsp_source, lsp_source_len, def_module,
(lang != CBM_LANG_C), prebuilt, imp_keys, imp_vals, imp_count,
result->cached_tree, &result->resolved_calls);
used_prebuilt = true;
break;
case CBM_LANG_CSHARP:
cbm_run_cs_lsp_cross_with_registry(&result->arena, result->source,
result->source_len, def_module, prebuilt,
imp_vals, imp_count, result->cached_tree,
&result->resolved_calls);
cbm_run_cs_lsp_cross_with_registry(
&result->arena, lsp_source, lsp_source_len, def_module, prebuilt,
imp_vals, imp_count, result->cached_tree, &result->resolved_calls);
used_prebuilt = true;
break;
case CBM_LANG_JAVASCRIPT:
Expand Down Expand Up @@ -2361,8 +2441,8 @@ static void resolve_worker(int worker_id, void *ctx_ptr) {
}
}
cbm_run_ts_lsp_cross_with_registry(
&result->arena, result->source, result->source_len, def_module, js, jsx,
dts, prebuilt, ts_defs, ts_def_count, imp_keys, imp_vals, imp_count,
&result->arena, lsp_source, lsp_source_len, def_module, js, jsx, dts,
prebuilt, ts_defs, ts_def_count, imp_keys, imp_vals, imp_count,
result->cached_tree, &result->resolved_calls);
free(ts_filtered);
used_prebuilt = true;
Expand Down Expand Up @@ -2394,16 +2474,16 @@ static void resolve_worker(int worker_id, void *ctx_ptr) {
lang == CBM_LANG_TSX) {
bool js, jsx, dts;
cbm_pxc_ts_modes(lang, rel, &js, &jsx, &dts);
cbm_pxc_run_one_ts(result, result->source, result->source_len, def_module,
cbm_pxc_run_one_ts(result, lsp_source, lsp_source_len, def_module,
file_defs, file_def_count, imp_keys, imp_vals, imp_count,
js, jsx, dts);
} else {
cbm_pxc_run_one(lang, result, result->source, result->source_len,
def_module, file_defs, file_def_count, imp_keys, imp_vals,
imp_count);
cbm_pxc_run_one(lang, result, lsp_source, lsp_source_len, def_module,
file_defs, file_def_count, imp_keys, imp_vals, imp_count);
}
}
free(filtered);
free_source(lsp_source_owned);
/* Contract: cbm_slab_reclaim() requires the thread parser to be
* destroyed first; otherwise its lexer holds slab pointers
* (lexer.included_ranges) that get freed underneath it, causing
Expand Down
25 changes: 13 additions & 12 deletions src/pipeline/pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,20 @@ static int run_parallel_pipeline(cbm_pipeline_t *p, cbm_pipeline_ctx_t *ctx,
cbm_log_error("pipeline.err", "phase", "cache_alloc");
return CBM_NOT_FOUND;
}
char cbm_lsp_cross_env[CBM_SZ_16];
const bool run_cross_lsp = cbm_safe_getenv("CBM_DISABLE_LSP_CROSS", cbm_lsp_cross_env,
sizeof(cbm_lsp_cross_env), NULL) == NULL;
if (!run_cross_lsp) {
cbm_log_info("lsp_cross.skipped", "reason", "CBM_DISABLE_LSP_CROSS env set");
}
cbm_parallel_extract_opts_t extract_opts = {
.retain_sources = run_cross_lsp,
.retain_sources_set = true,
};

cbm_clock_gettime(CLOCK_MONOTONIC, t);
int rc = cbm_parallel_extract(ctx, files, file_count, cache, &shared_ids, worker_count);
int rc = cbm_parallel_extract_ex(ctx, files, file_count, cache, &shared_ids, worker_count,
&extract_opts);
cbm_log_info("pass.timing", "pass", "parallel_extract", "elapsed_ms",
itoa_buf((int)elapsed_ms(*t)));
if (rc != 0 || check_cancel(p)) {
Expand Down Expand Up @@ -783,17 +795,6 @@ static int run_parallel_pipeline(cbm_pipeline_t *p, cbm_pipeline_ctx_t *ctx,
* mean cross-file LSP no-ops; per-file LSP already ran during
* extract. */
cbm_clock_gettime(CLOCK_MONOTONIC, t);
/* Cross-file LSP (type-aware call/usage resolution across files) — the
* most expensive phase. CBM_DISABLE_LSP_CROSS=1 opts out (it can SIGSEGV
* on large TS projects — see #340/#344); with cross-LSP off, all_defs
* stays NULL and the fused resolver simply no-ops cross-file resolution
* (per-file LSP already ran during extract). */
char cbm_lsp_cross_env[CBM_SZ_16];
const bool run_cross_lsp = cbm_safe_getenv("CBM_DISABLE_LSP_CROSS", cbm_lsp_cross_env,
sizeof(cbm_lsp_cross_env), NULL) == NULL;
if (!run_cross_lsp) {
cbm_log_info("lsp_cross.skipped", "reason", "CBM_DISABLE_LSP_CROSS env set");
}
char **def_modules = NULL;
int def_count = 0;
CBMLSPDef *all_defs = NULL;
Expand Down
8 changes: 7 additions & 1 deletion src/pipeline/pipeline_incremental.c
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,13 @@ static void run_extract_resolve(cbm_pipeline_ctx_t *ctx, cbm_file_info_t *change
CBMFileResult **cache = (CBMFileResult **)calloc(ci, sizeof(CBMFileResult *));
if (cache) {
cbm_clock_gettime(CLOCK_MONOTONIC, &t);
cbm_parallel_extract(ctx, changed_files, ci, cache, &shared_ids, worker_count);
const cbm_parallel_extract_opts_t extract_opts = {
.retain_sources = false,
.retain_sources_set = true,
};

cbm_parallel_extract_ex(ctx, changed_files, ci, cache, &shared_ids, worker_count,
&extract_opts);
cbm_gbuf_set_next_id(ctx->gbuf, atomic_load(&shared_ids));
cbm_log_info("pass.timing", "pass", "incr_extract", "elapsed_ms",
itoa_buf((int)elapsed_ms(t)));
Expand Down
10 changes: 10 additions & 0 deletions src/pipeline/pipeline_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,16 @@ char *cbm_infra_qn(const char *project_name, const char *rel_path, const char *i
* Each worker creates nodes in a per-worker gbuf, then merges into ctx->gbuf.
* Caches CBMFileResult* in result_cache[file_idx] for reuse in Phase 3B/4.
* shared_ids provides globally unique node/edge IDs across workers. */
typedef struct {
bool retain_sources;
bool retain_sources_set; /* false keeps the default retain_sources policy */
size_t retain_total_budget_bytes;
size_t retain_per_file_max_bytes;
} cbm_parallel_extract_opts_t;

int cbm_parallel_extract_ex(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count,
CBMFileResult **result_cache, _Atomic int64_t *shared_ids,
int worker_count, const cbm_parallel_extract_opts_t *opts);
int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count,
CBMFileResult **result_cache, _Atomic int64_t *shared_ids,
int worker_count);
Expand Down
Loading
Loading