diff --git a/src/pipeline/pass_parallel.c b/src/pipeline/pass_parallel.c index fefcf736a..7b23def16 100644 --- a/src/pipeline/pass_parallel.c +++ b/src/pipeline/pass_parallel.c @@ -40,15 +40,6 @@ enum { #define PP_FIELD_HINT_CONF 0.85 enum { PP_CSHARP_M_PREFIX_LEN = 2 }; -/* Source-retention caps for the parallel pipeline. The extract worker - * copies source bytes into result->arena so the fused cross-file LSP - * step in resolve_worker can run without re-reading from disk. Bound - * peak RSS with a per-file cap (skip retention for pathological huge - * generated files) and a total project-wide cap (skip when budget - * exhausted — cross-file LSP becomes a no-op for those late files, - * defs/calls already extracted are unaffected). */ -#define PP_RETAIN_PER_FILE_MAX_BYTES (100LL * 1024 * 1024) -#define PP_RETAIN_TOTAL_BUDGET_BYTES (2LL * 1024 * 1024 * 1024) #include "pipeline/pipeline.h" #include "pipeline/pipeline_internal.h" #include "pipeline/pass_lsp_cross.h" /* cbm_pxc_* helpers for fused cross-file LSP */ @@ -78,6 +69,63 @@ enum { PP_CSHARP_M_PREFIX_LEN = 2 }; #include #include +static size_t cbm_parallel_extract_default_total_cap(void) { + const size_t hard_max = 2ULL * 1024 * 1024 * 1024; + size_t cap = cbm_mem_budget(); + if (cap > 0) { + cap /= 4; + } else { + cap = hard_max; + } + if (cap > hard_max) { + cap = hard_max; + } + return cap; +} + +static size_t cbm_parallel_extract_default_per_file_cap(size_t total_cap) { + const size_t hard_max = 100ULL * 1024 * 1024; + size_t cap = hard_max; + if (cap > total_cap) { + cap = total_cap; + } + return cap; +} + +static cbm_parallel_extract_opts_t cbm_parallel_extract_resolve_opts( + const cbm_parallel_extract_opts_t *opts) { + const size_t hard_total_max = 2ULL * 1024 * 1024 * 1024; + const size_t hard_per_file_max = 100ULL * 1024 * 1024; + cbm_parallel_extract_opts_t resolved = { + .retain_sources = true, + .retain_total_budget_bytes = cbm_parallel_extract_default_total_cap(), + .retain_per_file_max_bytes = 0, + }; + resolved.retain_per_file_max_bytes = + cbm_parallel_extract_default_per_file_cap(resolved.retain_total_budget_bytes); + if (opts) { + if (opts->retain_sources_set) { + resolved.retain_sources = opts->retain_sources; + } + if (opts->retain_total_budget_bytes > 0) { + resolved.retain_total_budget_bytes = opts->retain_total_budget_bytes; + } + if (opts->retain_per_file_max_bytes > 0) { + resolved.retain_per_file_max_bytes = opts->retain_per_file_max_bytes; + } + } + if (resolved.retain_total_budget_bytes > hard_total_max) { + resolved.retain_total_budget_bytes = hard_total_max; + } + if (resolved.retain_per_file_max_bytes > hard_per_file_max) { + resolved.retain_per_file_max_bytes = hard_per_file_max; + } + if (resolved.retain_per_file_max_bytes > resolved.retain_total_budget_bytes) { + resolved.retain_per_file_max_bytes = resolved.retain_total_budget_bytes; + } + return resolved; +} + static uint64_t extract_now_ns(void) { struct timespec ts; cbm_clock_gettime(CLOCK_MONOTONIC, &ts); @@ -494,6 +542,10 @@ typedef struct { _Atomic int next_file_idx; cbm_pkg_entries_t *pkg_entries; /* per-worker manifest arrays (separate allocation) */ + bool retain_sources; + size_t retain_total_budget_bytes; + size_t retain_per_file_max_bytes; + _Atomic int64_t retained_bytes; /* total source bytes copied into result arenas */ } extract_ctx_t; @@ -631,16 +683,14 @@ static void extract_worker(int worker_id, void *ctx_ptr) { source_len, &ec->pkg_entries[worker_id]); } - /* Retain source bytes in result->arena so the fused cross-file - * LSP step in resolve_worker can run without re-reading from - * disk. Capped per-file (PP_RETAIN_PER_FILE_MAX_BYTES) and - * globally (PP_RETAIN_TOTAL_BUDGET_BYTES) to bound peak RSS. - * Skipping retention just means cross-file LSP no-ops for this - * file — defs/calls already extracted are unaffected. */ - if (source_len > 0 && (int64_t)source_len <= PP_RETAIN_PER_FILE_MAX_BYTES) { + /* Retain source bytes only when the resolved retention options allow it. + * Skipping retention means the fused cross-file LSP step will no-op for + * this file — defs/calls already extracted are unaffected. */ + if (ec->retain_sources && source_len > 0 && + (size_t)source_len <= ec->retain_per_file_max_bytes) { int64_t prior = atomic_fetch_add_explicit(&ec->retained_bytes, (int64_t)source_len, memory_order_relaxed); - if (prior + (int64_t)source_len <= PP_RETAIN_TOTAL_BUDGET_BYTES) { + if ((size_t)(prior + (int64_t)source_len) <= ec->retain_total_budget_bytes) { char *copy = (char *)cbm_arena_alloc(&result->arena, (size_t)source_len + 1); if (copy) { memcpy(copy, source, (size_t)source_len); @@ -719,15 +769,24 @@ static void log_extract_mem_stats(int worker_count) { } } -int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count, - CBMFileResult **result_cache, _Atomic int64_t *shared_ids, - int worker_count) { +int cbm_parallel_extract_ex(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count, + CBMFileResult **result_cache, _Atomic int64_t *shared_ids, + int worker_count, const cbm_parallel_extract_opts_t *opts) { + cbm_parallel_extract_opts_t resolved_opts = cbm_parallel_extract_resolve_opts(opts); + if (file_count == 0) { return 0; } cbm_log_info("parallel.extract.start", "files", itoa_log(file_count), "workers", itoa_log(worker_count)); + { + size_t mb = (size_t)CBM_SZ_1K * CBM_SZ_1K; + cbm_log_info("parallel.extract.retention", "retain_sources", + resolved_opts.retain_sources ? "true" : "false", "total_mb", + itoa_log((int)(resolved_opts.retain_total_budget_bytes / mb)), "per_file_mb", + itoa_log((int)(resolved_opts.retain_per_file_max_bytes / mb))); + } /* Log per-worker memory budget */ if (cbm_mem_budget() > 0) { @@ -748,7 +807,10 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, /* Sub-phase: Sort files by descending size for tail-latency reduction */ CBM_PROF_START(t_sort); - file_sort_entry_t *sorted = malloc(file_count * sizeof(file_sort_entry_t)); + file_sort_entry_t *sorted = malloc((size_t)file_count * sizeof(file_sort_entry_t)); + if (!sorted) { + return CBM_NOT_FOUND; + } for (int i = 0; i < file_count; i++) { sorted[i].idx = i; sorted[i].size = files[i].size; @@ -766,8 +828,12 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, memset(workers, 0, (size_t)worker_count * sizeof(extract_worker_state_t)); /* Per-worker manifest entry arrays (separate from cache-line-aligned worker state) */ - cbm_pkg_entries_t *pkg_entries = calloc(worker_count, sizeof(cbm_pkg_entries_t)); - + cbm_pkg_entries_t *pkg_entries = calloc((size_t)worker_count, sizeof(cbm_pkg_entries_t)); + if (!pkg_entries) { + cbm_aligned_free(workers); + free(sorted); + return CBM_NOT_FOUND; + } extract_ctx_t ec = { .files = files, .sorted = sorted, @@ -780,6 +846,9 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, .shared_ids = shared_ids, .cancelled = ctx->cancelled, .pkg_entries = pkg_entries, + .retain_sources = resolved_opts.retain_sources, + .retain_total_budget_bytes = resolved_opts.retain_total_budget_bytes, + .retain_per_file_max_bytes = resolved_opts.retain_per_file_max_bytes, }; atomic_init(&ec.next_worker_id, 0); atomic_init(&ec.next_file_idx, 0); @@ -787,8 +856,8 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, /* Sub-phase: Dispatch workers (parse + extract per file, PARALLEL) */ CBM_PROF_START(t_dispatch); - cbm_parallel_for_opts_t opts = {.max_workers = worker_count, .force_pthreads = false}; - cbm_parallel_for(worker_count, extract_worker, &ec, opts); + cbm_parallel_for_opts_t parallel_opts = {.max_workers = worker_count, .force_pthreads = false}; + cbm_parallel_for(worker_count, extract_worker, &ec, parallel_opts); CBM_PROF_END_N("parallel_extract", "3_dispatch_workers_parallel", t_dispatch, file_count); /* Sub-phase: Merge all local gbufs into main gbuf (SEQUENTIAL, gbuf not thread-safe) */ @@ -821,7 +890,12 @@ int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, return 0; } -/* ── Phase 3B: Serial Registry Build ─────────────────────────────── */ +int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count, + CBMFileResult **result_cache, _Atomic int64_t *shared_ids, + int worker_count) { + return cbm_parallel_extract_ex(ctx, files, file_count, result_cache, shared_ids, worker_count, + NULL); +} /* Register one definition and create DEFINES + DEFINES_METHOD edges. Returns edge count. */ static int register_and_link_def(cbm_pipeline_ctx_t *ctx, const CBMDefinition *def, const char *rel, @@ -2270,18 +2344,25 @@ static void resolve_worker(int worker_id, void *ctx_ptr) { * Runs BEFORE resolve_file_calls so its additions to * result->resolved_calls are picked up by * cbm_pipeline_find_lsp_resolution when calls become CALLS - * edges. Requires result->source to have been retained in - * result->arena during extract (PP_RETAIN_*); files over the - * cap or past the budget have result->source==NULL and are - * counted as skipped_no_source — defs/calls already in the - * extract are unaffected. + * edges. Prefer source bytes retained in result->arena; if + * the low-RAM retention cap skipped this file, fall back to a + * bounded per-file read and free it immediately after the LSP + * call. This keeps project-wide retention bounded without + * dropping cross-file LSP correctness for unretained files. * * Slab reclaim afterward: the LSP re-parses via tree-sitter, * which allocates through this worker's TLS slab. Reclaiming * here keeps the slab high-water bounded as the resolve phase * walks across thousands of files in a single worker thread. */ if (cross_lsp_eligible) { - if (result->source && result->source_len > 0) { + char *lsp_source_owned = NULL; + const char *lsp_source = result->source; + int lsp_source_len = result->source_len; + if ((!lsp_source || lsp_source_len <= 0) && rc->files[file_idx].path) { + lsp_source_owned = read_file(rc->files[file_idx].path, &lsp_source_len); + lsp_source = lsp_source_owned; + } + if (lsp_source && lsp_source_len > 0) { const char *def_module = rc->def_modules ? rc->def_modules[file_idx] : module_qn; uint64_t lsp_t0 = extract_now_ns(); @@ -2315,8 +2396,8 @@ static void resolve_worker(int worker_id, void *ctx_ptr) { } case CBM_LANG_PYTHON: cbm_run_py_lsp_cross_with_registry( - &result->arena, result->source, result->source_len, def_module, - prebuilt, imp_keys, imp_vals, imp_count, result->cached_tree, + &result->arena, lsp_source, lsp_source_len, def_module, prebuilt, + imp_keys, imp_vals, imp_count, result->cached_tree, &result->resolved_calls); used_prebuilt = true; break; @@ -2324,16 +2405,15 @@ static void resolve_worker(int worker_id, void *ctx_ptr) { case CBM_LANG_CPP: case CBM_LANG_CUDA: cbm_run_c_lsp_cross_with_registry( - &result->arena, result->source, result->source_len, def_module, + &result->arena, lsp_source, lsp_source_len, def_module, (lang != CBM_LANG_C), prebuilt, imp_keys, imp_vals, imp_count, result->cached_tree, &result->resolved_calls); used_prebuilt = true; break; case CBM_LANG_CSHARP: - cbm_run_cs_lsp_cross_with_registry(&result->arena, result->source, - result->source_len, def_module, prebuilt, - imp_vals, imp_count, result->cached_tree, - &result->resolved_calls); + cbm_run_cs_lsp_cross_with_registry( + &result->arena, lsp_source, lsp_source_len, def_module, prebuilt, + imp_vals, imp_count, result->cached_tree, &result->resolved_calls); used_prebuilt = true; break; case CBM_LANG_JAVASCRIPT: @@ -2361,8 +2441,8 @@ static void resolve_worker(int worker_id, void *ctx_ptr) { } } cbm_run_ts_lsp_cross_with_registry( - &result->arena, result->source, result->source_len, def_module, js, jsx, - dts, prebuilt, ts_defs, ts_def_count, imp_keys, imp_vals, imp_count, + &result->arena, lsp_source, lsp_source_len, def_module, js, jsx, dts, + prebuilt, ts_defs, ts_def_count, imp_keys, imp_vals, imp_count, result->cached_tree, &result->resolved_calls); free(ts_filtered); used_prebuilt = true; @@ -2394,16 +2474,16 @@ static void resolve_worker(int worker_id, void *ctx_ptr) { lang == CBM_LANG_TSX) { bool js, jsx, dts; cbm_pxc_ts_modes(lang, rel, &js, &jsx, &dts); - cbm_pxc_run_one_ts(result, result->source, result->source_len, def_module, + cbm_pxc_run_one_ts(result, lsp_source, lsp_source_len, def_module, file_defs, file_def_count, imp_keys, imp_vals, imp_count, js, jsx, dts); } else { - cbm_pxc_run_one(lang, result, result->source, result->source_len, - def_module, file_defs, file_def_count, imp_keys, imp_vals, - imp_count); + cbm_pxc_run_one(lang, result, lsp_source, lsp_source_len, def_module, + file_defs, file_def_count, imp_keys, imp_vals, imp_count); } } free(filtered); + free_source(lsp_source_owned); /* Contract: cbm_slab_reclaim() requires the thread parser to be * destroyed first; otherwise its lexer holds slab pointers * (lexer.included_ranges) that get freed underneath it, causing diff --git a/src/pipeline/pipeline.c b/src/pipeline/pipeline.c index 1cdfda02b..385038a74 100644 --- a/src/pipeline/pipeline.c +++ b/src/pipeline/pipeline.c @@ -741,8 +741,20 @@ static int run_parallel_pipeline(cbm_pipeline_t *p, cbm_pipeline_ctx_t *ctx, cbm_log_error("pipeline.err", "phase", "cache_alloc"); return CBM_NOT_FOUND; } + char cbm_lsp_cross_env[CBM_SZ_16]; + const bool run_cross_lsp = cbm_safe_getenv("CBM_DISABLE_LSP_CROSS", cbm_lsp_cross_env, + sizeof(cbm_lsp_cross_env), NULL) == NULL; + if (!run_cross_lsp) { + cbm_log_info("lsp_cross.skipped", "reason", "CBM_DISABLE_LSP_CROSS env set"); + } + cbm_parallel_extract_opts_t extract_opts = { + .retain_sources = run_cross_lsp, + .retain_sources_set = true, + }; + cbm_clock_gettime(CLOCK_MONOTONIC, t); - int rc = cbm_parallel_extract(ctx, files, file_count, cache, &shared_ids, worker_count); + int rc = cbm_parallel_extract_ex(ctx, files, file_count, cache, &shared_ids, worker_count, + &extract_opts); cbm_log_info("pass.timing", "pass", "parallel_extract", "elapsed_ms", itoa_buf((int)elapsed_ms(*t))); if (rc != 0 || check_cancel(p)) { @@ -783,17 +795,6 @@ static int run_parallel_pipeline(cbm_pipeline_t *p, cbm_pipeline_ctx_t *ctx, * mean cross-file LSP no-ops; per-file LSP already ran during * extract. */ cbm_clock_gettime(CLOCK_MONOTONIC, t); - /* Cross-file LSP (type-aware call/usage resolution across files) — the - * most expensive phase. CBM_DISABLE_LSP_CROSS=1 opts out (it can SIGSEGV - * on large TS projects — see #340/#344); with cross-LSP off, all_defs - * stays NULL and the fused resolver simply no-ops cross-file resolution - * (per-file LSP already ran during extract). */ - char cbm_lsp_cross_env[CBM_SZ_16]; - const bool run_cross_lsp = cbm_safe_getenv("CBM_DISABLE_LSP_CROSS", cbm_lsp_cross_env, - sizeof(cbm_lsp_cross_env), NULL) == NULL; - if (!run_cross_lsp) { - cbm_log_info("lsp_cross.skipped", "reason", "CBM_DISABLE_LSP_CROSS env set"); - } char **def_modules = NULL; int def_count = 0; CBMLSPDef *all_defs = NULL; diff --git a/src/pipeline/pipeline_incremental.c b/src/pipeline/pipeline_incremental.c index e5d1b4c9f..7cf09dcd2 100644 --- a/src/pipeline/pipeline_incremental.c +++ b/src/pipeline/pipeline_incremental.c @@ -552,7 +552,13 @@ static void run_extract_resolve(cbm_pipeline_ctx_t *ctx, cbm_file_info_t *change CBMFileResult **cache = (CBMFileResult **)calloc(ci, sizeof(CBMFileResult *)); if (cache) { cbm_clock_gettime(CLOCK_MONOTONIC, &t); - cbm_parallel_extract(ctx, changed_files, ci, cache, &shared_ids, worker_count); + const cbm_parallel_extract_opts_t extract_opts = { + .retain_sources = false, + .retain_sources_set = true, + }; + + cbm_parallel_extract_ex(ctx, changed_files, ci, cache, &shared_ids, worker_count, + &extract_opts); cbm_gbuf_set_next_id(ctx->gbuf, atomic_load(&shared_ids)); cbm_log_info("pass.timing", "pass", "incr_extract", "elapsed_ms", itoa_buf((int)elapsed_ms(t))); diff --git a/src/pipeline/pipeline_internal.h b/src/pipeline/pipeline_internal.h index af1a8de12..499c615e8 100644 --- a/src/pipeline/pipeline_internal.h +++ b/src/pipeline/pipeline_internal.h @@ -402,6 +402,16 @@ char *cbm_infra_qn(const char *project_name, const char *rel_path, const char *i * Each worker creates nodes in a per-worker gbuf, then merges into ctx->gbuf. * Caches CBMFileResult* in result_cache[file_idx] for reuse in Phase 3B/4. * shared_ids provides globally unique node/edge IDs across workers. */ +typedef struct { + bool retain_sources; + bool retain_sources_set; /* false keeps the default retain_sources policy */ + size_t retain_total_budget_bytes; + size_t retain_per_file_max_bytes; +} cbm_parallel_extract_opts_t; + +int cbm_parallel_extract_ex(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count, + CBMFileResult **result_cache, _Atomic int64_t *shared_ids, + int worker_count, const cbm_parallel_extract_opts_t *opts); int cbm_parallel_extract(cbm_pipeline_ctx_t *ctx, const cbm_file_info_t *files, int file_count, CBMFileResult **result_cache, _Atomic int64_t *shared_ids, int worker_count); diff --git a/tests/test_mem.c b/tests/test_mem.c index debb9b505..99dc13b59 100644 --- a/tests/test_mem.c +++ b/tests/test_mem.c @@ -565,6 +565,160 @@ static void teardown_mem_test_repo(void) { } } +static size_t count_retained_source_bytes(CBMFileResult **result_cache, int file_count, + int *retained_count) { + size_t retained_bytes = 0; + int count = 0; + + for (int i = 0; i < file_count; i++) { + CBMFileResult *result = result_cache[i]; + if (result && result->source) { + retained_bytes += (size_t)result->source_len; + count++; + } + } + + if (retained_count) { + *retained_count = count; + } + return retained_bytes; +} + +TEST(parallel_extract_without_source_retention) { + if (setup_mem_test_repo() != 0) { + FAIL("tmpdir setup failed"); + } + + cbm_discover_opts_t opts = {.mode = CBM_MODE_FULL}; + cbm_file_info_t *files = NULL; + int file_count = 0; + if (cbm_discover(g_mem_tmpdir, &opts, &files, &file_count) != 0) { + teardown_mem_test_repo(); + FAIL("discover failed"); + } + + cbm_gbuf_t *gbuf = cbm_gbuf_new("mem-test", g_mem_tmpdir); + cbm_registry_t *reg = cbm_registry_new(); + atomic_int cancelled; + atomic_init(&cancelled, 0); + + cbm_pipeline_ctx_t ctx = { + .project_name = "mem-test", + .repo_path = g_mem_tmpdir, + .gbuf = gbuf, + .registry = reg, + .cancelled = &cancelled, + }; + + _Atomic int64_t shared_ids; + atomic_init(&shared_ids, cbm_gbuf_next_id(gbuf)); + + CBMFileResult **result_cache = calloc((size_t)file_count, sizeof(CBMFileResult *)); + ASSERT_NOT_NULL(result_cache); + + cbm_parallel_extract_opts_t extract_opts = { + .retain_sources = false, + .retain_sources_set = true, + .retain_total_budget_bytes = 0, + .retain_per_file_max_bytes = 0, + }; + int rc = cbm_parallel_extract_ex(&ctx, files, file_count, result_cache, &shared_ids, 2, + &extract_opts); + ASSERT_EQ(rc, 0); + + int defs_seen = 0; + for (int i = 0; i < file_count; i++) { + if (result_cache[i]) { + ASSERT_EQ(result_cache[i]->source, NULL); + defs_seen += result_cache[i]->defs.count; + } + } + ASSERT_GT(defs_seen, 0); + ASSERT_GT(cbm_gbuf_node_count(gbuf), 0); + + for (int i = 0; i < file_count; i++) { + if (result_cache[i]) { + cbm_free_result(result_cache[i]); + } + } + free(result_cache); + cbm_registry_free(reg); + cbm_gbuf_free(gbuf); + cbm_discover_free(files, file_count); + teardown_mem_test_repo(); + PASS(); +} + +TEST(parallel_extract_tiny_source_retention_budget) { + if (setup_mem_test_repo() != 0) { + FAIL("tmpdir setup failed"); + } + + cbm_discover_opts_t opts = {.mode = CBM_MODE_FULL}; + cbm_file_info_t *files = NULL; + int file_count = 0; + if (cbm_discover(g_mem_tmpdir, &opts, &files, &file_count) != 0) { + teardown_mem_test_repo(); + FAIL("discover failed"); + } + + cbm_gbuf_t *gbuf = cbm_gbuf_new("mem-test", g_mem_tmpdir); + cbm_registry_t *reg = cbm_registry_new(); + atomic_int cancelled; + atomic_init(&cancelled, 0); + + cbm_pipeline_ctx_t ctx = { + .project_name = "mem-test", + .repo_path = g_mem_tmpdir, + .gbuf = gbuf, + .registry = reg, + .cancelled = &cancelled, + }; + + _Atomic int64_t shared_ids; + atomic_init(&shared_ids, cbm_gbuf_next_id(gbuf)); + + CBMFileResult **result_cache = calloc((size_t)file_count, sizeof(CBMFileResult *)); + ASSERT_NOT_NULL(result_cache); + + const size_t retain_total_budget_bytes = 256; + cbm_parallel_extract_opts_t extract_opts = { + .retain_sources = true, + .retain_sources_set = true, + .retain_total_budget_bytes = retain_total_budget_bytes, + .retain_per_file_max_bytes = 100U * 1024U * 1024U, + }; + int rc = cbm_parallel_extract_ex(&ctx, files, file_count, result_cache, &shared_ids, 2, + &extract_opts); + ASSERT_EQ(rc, 0); + + int retained_count = 0; + size_t retained_bytes = count_retained_source_bytes(result_cache, file_count, &retained_count); + int defs_seen = 0; + for (int i = 0; i < file_count; i++) { + if (result_cache[i]) { + defs_seen += result_cache[i]->defs.count; + } + } + + ASSERT_GT(defs_seen, 0); + ASSERT_GT(retained_count, 0); + ASSERT_LTE(retained_bytes, retain_total_budget_bytes); + ASSERT_GT(cbm_gbuf_node_count(gbuf), 0); + + for (int i = 0; i < file_count; i++) { + if (result_cache[i]) { + cbm_free_result(result_cache[i]); + } + } + free(result_cache); + cbm_registry_free(reg); + cbm_gbuf_free(gbuf); + cbm_discover_free(files, file_count); + teardown_mem_test_repo(); + PASS(); +} + TEST(parallel_extract_with_slab) { cbm_mem_init(0.5); @@ -666,5 +820,7 @@ SUITE(mem) { RUN_TEST(slab_calloc_zeroed); RUN_TEST(slab_mixed_alloc_free_stress); /* Integration */ + RUN_TEST(parallel_extract_without_source_retention); + RUN_TEST(parallel_extract_tiny_source_retention_budget); RUN_TEST(parallel_extract_with_slab); } diff --git a/tests/test_parallel.c b/tests/test_parallel.c index 746e1f2c3..1c54d4043 100644 --- a/tests/test_parallel.c +++ b/tests/test_parallel.c @@ -111,8 +111,10 @@ static cbm_gbuf_t *run_sequential(const char *project, const char *repo_path, /* ── Run parallel pipeline on files, returning gbuf ───────────────── */ -static cbm_gbuf_t *run_parallel(const char *project, const char *repo_path, cbm_file_info_t *files, - int file_count, int worker_count) { +static cbm_gbuf_t *run_parallel_with_extract_opts(const char *project, const char *repo_path, + cbm_file_info_t *files, int file_count, + int worker_count, + const cbm_parallel_extract_opts_t *extract_opts) { cbm_gbuf_t *gbuf = cbm_gbuf_new(project, repo_path); cbm_registry_t *reg = cbm_registry_new(); atomic_int cancelled; @@ -130,10 +132,15 @@ static cbm_gbuf_t *run_parallel(const char *project, const char *repo_path, cbm_ int64_t gbuf_next = cbm_gbuf_next_id(gbuf); atomic_init(&shared_ids, gbuf_next); - CBMFileResult **result_cache = calloc(file_count, sizeof(CBMFileResult *)); + CBMFileResult **result_cache = calloc((size_t)file_count, sizeof(CBMFileResult *)); cbm_init(); - cbm_parallel_extract(&ctx, files, file_count, result_cache, &shared_ids, worker_count); + if (extract_opts) { + cbm_parallel_extract_ex(&ctx, files, file_count, result_cache, &shared_ids, worker_count, + extract_opts); + } else { + cbm_parallel_extract(&ctx, files, file_count, result_cache, &shared_ids, worker_count); + } cbm_gbuf_set_next_id(gbuf, atomic_load(&shared_ids)); cbm_build_registry_from_cache(&ctx, files, file_count, result_cache); @@ -174,6 +181,12 @@ static cbm_gbuf_t *run_parallel(const char *project, const char *repo_path, cbm_ return gbuf; } +static cbm_gbuf_t *run_parallel(const char *project, const char *repo_path, cbm_file_info_t *files, + int file_count, int worker_count) { + return run_parallel_with_extract_opts(project, repo_path, files, file_count, worker_count, + NULL); +} + /* ── Parity Tests ─────────────────────────────────────────────────── */ static cbm_gbuf_t *g_seq_gbuf = NULL; @@ -662,6 +675,69 @@ TEST(parallel_python_lsp_override_cross_file_emits_lsp_strategy_edges) { PASS(); } +TEST(parallel_python_lsp_cross_file_reads_unretained_source) { + char tmpdir[256]; + snprintf(tmpdir, sizeof(tmpdir), "/tmp/cbm_par_pylsp_xf_nosrc_XXXXXX"); + if (!cbm_mkdtemp(tmpdir)) { + FAIL("mkdtemp failed"); + } + + char gpath[512]; + snprintf(gpath, sizeof(gpath), "%s/greeter.py", tmpdir); + FILE *gf = fopen(gpath, "w"); + if (!gf) { + FAIL("fopen greeter.py failed"); + } + fprintf(gf, "class Greeter:\n" + " def hello(self):\n" + " return 'hi'\n"); + fclose(gf); + + char apath[512]; + snprintf(apath, sizeof(apath), "%s/app.py", tmpdir); + FILE *af = fopen(apath, "w"); + if (!af) { + unlink(gpath); + rmdir(tmpdir); + FAIL("fopen app.py failed"); + } + fprintf(af, "from greeter import Greeter\n" + "\n" + "def main():\n" + " g = Greeter()\n" + " g.hello()\n"); + fclose(af); + + cbm_file_info_t files[2] = {0}; + files[0].path = gpath; + files[0].rel_path = (char *)"greeter.py"; + files[0].language = CBM_LANG_PYTHON; + files[1].path = apath; + files[1].rel_path = (char *)"app.py"; + files[1].language = CBM_LANG_PYTHON; + + cbm_parallel_extract_opts_t extract_opts = { + .retain_sources = false, + .retain_sources_set = true, + }; + cbm_gbuf_t *gbuf = run_parallel_with_extract_opts("cbm_par_pylsp_xf_nosrc", tmpdir, files, 2, 2, + &extract_opts); + ASSERT_NOT_NULL(gbuf); + + lsp_edge_count_ctx_t c = {0}; + cbm_gbuf_foreach_edge(gbuf, count_lsp_call_edges, &c); + + ASSERT_GT(c.total_calls, 0); + ASSERT_GT(c.lsp_strategy_count, 0); + + cbm_gbuf_free(gbuf); + + unlink(apath); + unlink(gpath); + rmdir(tmpdir); + PASS(); +} + /* issue #294: gRPC service-name extraction must (a) preserve the canonical * proto service name (FooServiceClient → FooService, not Foo) and (b) only * match real stub/client types — ordinary receiver vars must NOT produce @@ -719,6 +795,7 @@ SUITE(parallel) { RUN_TEST(parallel_node_count); RUN_TEST(parallel_python_lsp_override_emits_lsp_strategy_edges); RUN_TEST(parallel_python_lsp_override_cross_file_emits_lsp_strategy_edges); + RUN_TEST(parallel_python_lsp_cross_file_reads_unretained_source); RUN_TEST(parallel_calls_parity); RUN_TEST(parallel_defines_parity); RUN_TEST(parallel_defines_method_parity);