diff --git a/README.md b/README.md index a75289c..e5f7495 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,11 @@ cachew git restore https://github.com/org/repo ./repo ```hcl git { - snapshot-interval = "1h" - repack-interval = "1h" + snapshot-interval = "1h" + repack-interval = "1h" + # Full repack is expensive (re-deltas every object) and only needs to run + # occasionally; the frequent geometric repack absorbs day-to-day churn. + full-repack-interval = "168h" } ``` @@ -275,8 +278,9 @@ github-app { git-clone {} git { - snapshot-interval = "1h" - repack-interval = "1h" + snapshot-interval = "1h" + repack-interval = "1h" + full-repack-interval = "168h" } github-releases { diff --git a/internal/gitclone/manager.go b/internal/gitclone/manager.go index 90a2bb3..afc5812 100644 --- a/internal/gitclone/manager.go +++ b/internal/gitclone/manager.go @@ -65,8 +65,17 @@ type Config struct { LsRemoteTimeout time.Duration `hcl:"ls-remote-timeout,optional" help:"Upper bound for 'git ls-remote' so a slow upstream cannot block the request path indefinitely." default:"1m"` RepackTimeout time.Duration `hcl:"repack-timeout,optional" help:"Upper bound for 'git repack' so a slow repack on a large repository cannot block the scheduler queue indefinitely." default:"10m"` RepackThreads int `hcl:"repack-threads,optional" help:"Threads for git repack operations. Limits memory since windowMemory and deltaCacheSize are per-thread. 0 = pack-threads." default:"4"` + + FullRepackTimeout time.Duration `hcl:"full-repack-timeout,optional" help:"Upper bound for the full (delta-recomputing) repack, which is far slower than the geometric repack on large repositories. Size it to complete the largest mirror, since a timeout kills the job and wastes the work; it also holds a scheduler slot for its whole duration, so pair it with a slow full-repack-interval. 0 falls back to repack-timeout." default:"6h"` } +// Delta search window and chain depth for the full repack. A wider window finds +// tighter deltas (smaller packs) at the cost of more CPU. +const ( + fullRepackWindow = 100 + fullRepackDepth = 50 +) + // CredentialProvider provides credentials for git operations. type CredentialProvider interface { GetTokenForURL(ctx context.Context, url string) (string, error) @@ -807,8 +816,46 @@ func (r *Repository) GetUpstreamRefs(ctx context.Context) (map[string]string, er func (r *Repository) Repack(ctx context.Context) error { logger := logging.FromContext(ctx) logger.InfoContext(ctx, "Geometric repack started", "upstream", r.upstreamURL) + if err := r.runRepack(ctx, r.config.RepackTimeout, + "-d", "--geometric=2", "--write-midx", "--write-bitmap-index"); err != nil { + return err + } + logger.InfoContext(ctx, "Geometric repack completed", "upstream", r.upstreamURL) + return nil +} + +// RepackFull runs a full repack that re-selects deltas across all objects with +// a wide search window (-a -d -f --window --depth). Unlike the geometric +// Repack, which reuses existing deltas and only consolidates packs, this +// recovers the cross-pack redundancy that accumulates from incremental fetches +// — materially shrinking the mirror, and therefore the snapshots derived from +// it, at the cost of significant one-time CPU. It is meant to run on a slow +// cadence in between the frequent geometric repacks. +func (r *Repository) RepackFull(ctx context.Context) error { + logger := logging.FromContext(ctx) + + timeout := r.config.FullRepackTimeout + if timeout <= 0 { + timeout = r.config.RepackTimeout + } - repackCtx, cancel := context.WithTimeout(ctx, r.config.RepackTimeout) + logger.InfoContext(ctx, "Full repack started", "upstream", r.upstreamURL, "window", fullRepackWindow, "depth", fullRepackDepth) + if err := r.runRepack(ctx, timeout, + "-a", "-d", "-f", + "--window="+strconv.Itoa(fullRepackWindow), "--depth="+strconv.Itoa(fullRepackDepth), + "--write-midx", "--write-bitmap-index"); err != nil { + return err + } + logger.InfoContext(ctx, "Full repack completed", "upstream", r.upstreamURL) + return nil +} + +// runRepack executes "git repack " with bounded threads/memory and a +// timeout, cleaning up a stale multi-pack-index.lock on failure. +func (r *Repository) runRepack(ctx context.Context, timeout time.Duration, repackArgs ...string) error { + logger := logging.FromContext(ctx) + + repackCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() threads := r.config.RepackThreads @@ -820,12 +867,15 @@ func (r *Repository) Repack(ctx context.Context) error { // config uses high values (512m deltaCacheSize, 1g windowMemory) tuned for // serving performance with many threads. Repack is a background task that // can afford to be slower in exchange for bounded memory. - // #nosec G204 - r.path is controlled by us - cmd := exec.CommandContext(repackCtx, "git", "-C", r.path, - "-c", "pack.threads="+strconv.Itoa(threads), + args := []string{"-C", r.path, + "-c", "pack.threads=" + strconv.Itoa(threads), "-c", "pack.windowMemory=256m", "-c", "pack.deltaCacheSize=128m", - "repack", "-d", "--geometric=2", "--write-midx", "--write-bitmap-index") + "repack"} + args = append(args, repackArgs...) + + // #nosec G204 - r.path is controlled by us + cmd := exec.CommandContext(repackCtx, "git", args...) cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} cmd.Cancel = func() error { return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) @@ -842,8 +892,6 @@ func (r *Repository) Repack(ctx context.Context) error { } return errors.Wrapf(err, "git repack: %s", string(output)) } - - logger.InfoContext(ctx, "Geometric repack completed", "upstream", r.upstreamURL) return nil } diff --git a/internal/gitclone/manager_test.go b/internal/gitclone/manager_test.go index 40f8b21..88df264 100644 --- a/internal/gitclone/manager_test.go +++ b/internal/gitclone/manager_test.go @@ -418,6 +418,35 @@ func TestRepository_Repack(t *testing.T) { assert.NoError(t, err) } +func TestRepository_RepackFull(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) + tmpDir := t.TempDir() + upstreamPath := createBareRepo(t, tmpDir) + + clonePath := filepath.Join(tmpDir, "mirror") + cmd := exec.Command("git", "clone", "--mirror", upstreamPath, clonePath) + assert.NoError(t, cmd.Run()) + + repo := &Repository{ + state: StateReady, + config: testRepoConfig(), + path: clonePath, + upstreamURL: upstreamPath, + fetchSem: make(chan struct{}, 1), + } + repo.fetchSem <- struct{}{} + + // Unset full-repack-timeout exercises the fallback to repack-timeout. + assert.NoError(t, repo.RepackFull(ctx)) + + packs, err := filepath.Glob(filepath.Join(clonePath, "objects", "pack", "*.pack")) + assert.NoError(t, err) + assert.True(t, len(packs) > 0, "expected at least one pack file after full repack") + + _, err = os.Stat(filepath.Join(clonePath, "objects", "pack", "multi-pack-index")) + assert.NoError(t, err) +} + func TestRepository_Repack_CleansUpStaleLockOnFailure(t *testing.T) { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError}) tmpDir := t.TempDir() diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 261b3d3..de57a8e 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -46,7 +46,8 @@ func Register(r *strategy.Registry, scheduler jobscheduler.Provider, cloneManage type Config struct { SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd workstation snapshots. 0 disables snapshots." default:"0"` MirrorSnapshotInterval time.Duration `hcl:"mirror-snapshot-interval,optional" help:"How often to generate mirror snapshots for pod bootstrap. 0 uses snapshot-interval. Defaults to 2h." default:"2h"` - RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"` + RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run the geometric repack (consolidates packs, reuses deltas). 0 disables." default:"0"` + FullRepackInterval time.Duration `hcl:"full-repack-interval,optional" help:"How often to run the full repack (re-selects deltas across all objects, shrinking the mirror). Far more expensive than the geometric repack, so use a slow cadence. 0 disables." default:"0"` ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"` BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"` } @@ -247,7 +248,7 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) error { if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) } - if s.config.RepackInterval > 0 { + if s.repackEnabled() { s.scheduleRepackJobs(repo) } } @@ -618,7 +619,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) (r if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) } - if s.config.RepackInterval > 0 { + if s.repackEnabled() { s.scheduleRepackJobs(repo) } return nil @@ -648,7 +649,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) (r if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) } - if s.config.RepackInterval > 0 { + if s.repackEnabled() { s.scheduleRepackJobs(repo) } return nil diff --git a/internal/strategy/git/repack.go b/internal/strategy/git/repack.go index 8a42853..1114b0b 100644 --- a/internal/strategy/git/repack.go +++ b/internal/strategy/git/repack.go @@ -15,12 +15,29 @@ import ( "github.com/block/cachew/internal/gitclone" ) +// repackEnabled reports whether any repack variant is configured to run. +func (s *Strategy) repackEnabled() bool { + return s.config.RepackInterval > 0 || s.config.FullRepackInterval > 0 +} + func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) { - s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, func(ctx context.Context) (returnErr error) { + if s.config.RepackInterval > 0 { + s.schedulePeriodicRepack(repo, "repack-periodic", "repack", s.config.RepackInterval, repo.Repack) + } + if s.config.FullRepackInterval > 0 { + s.schedulePeriodicRepack(repo, "repack-full-periodic", "repack_full", s.config.FullRepackInterval, repo.RepackFull) + } +} + +// schedulePeriodicRepack runs repack on the given interval, recording the +// before/after pack count, duration, and outcome. operation distinguishes the +// geometric ("repack") and full ("repack_full") variants in metrics and traces. +func (s *Strategy) schedulePeriodicRepack(repo *gitclone.Repository, jobID, operation string, interval time.Duration, repack func(context.Context) error) { + s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), jobID, interval, func(ctx context.Context) (returnErr error) { upstream := repo.UpstreamURL() - ctx, span := tracer.Start(ctx, "git.repack", + ctx, span := tracer.Start(ctx, "git."+operation, trace.WithAttributes( - attribute.String("cachew.operation", "repack"), + attribute.String("cachew.operation", operation), attribute.String("cachew.upstream", upstream), ), ) @@ -32,28 +49,28 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) { span.End() }() - // Pack count before and after gives us a direct view of how much - // the geometric repack actually consolidated. A flat before/after - // ratio over time means fragmentation is outpacing the schedule. + // Pack count before and after gives us a direct view of how much the + // repack consolidated. A flat before/after ratio over time means + // fragmentation is outpacing the schedule. if before, err := countPackFiles(repo.Path()); err == nil { s.metrics.recordRepackPackCount(ctx, upstream, "before", before) span.SetAttributes(attribute.Int("cachew.pack_count_before", before)) } start := time.Now() - err := repo.Repack(ctx) + err := repack(ctx) status := "success" if err != nil { status = "error" } - s.metrics.recordOperation(ctx, "repack", status, time.Since(start)) + s.metrics.recordOperation(ctx, operation, status, time.Since(start)) if after, countErr := countPackFiles(repo.Path()); countErr == nil { s.metrics.recordRepackPackCount(ctx, upstream, "after", after) span.SetAttributes(attribute.Int("cachew.pack_count_after", after)) } - return errors.Wrap(err, "repack") + return errors.Wrap(err, operation) }) } diff --git a/internal/strategy/git/repack_test.go b/internal/strategy/git/repack_test.go index 4ab7111..24ff6b9 100644 --- a/internal/strategy/git/repack_test.go +++ b/internal/strategy/git/repack_test.go @@ -20,16 +20,25 @@ func TestRepackInterval(t *testing.T) { tmpDir := t.TempDir() tests := []struct { - name string - repackInterval time.Duration + name string + repackInterval time.Duration + fullRepackInterval time.Duration }{ { - name: "Enabled", + name: "GeometricEnabled", repackInterval: 24 * time.Hour, }, { - name: "Disabled", - repackInterval: 0, + name: "FullEnabled", + fullRepackInterval: 7 * 24 * time.Hour, + }, + { + name: "BothEnabled", + repackInterval: 24 * time.Hour, + fullRepackInterval: 7 * 24 * time.Hour, + }, + { + name: "Disabled", }, } @@ -40,7 +49,8 @@ func TestRepackInterval(t *testing.T) { MirrorRoot: filepath.Join(tmpDir, tt.name), }, nil) s, err := git.New(ctx, git.Config{ - RepackInterval: tt.repackInterval, + RepackInterval: tt.repackInterval, + FullRepackInterval: tt.fullRepackInterval, }, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil assert.NoError(t, err) assert.True(t, s != nil) diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index 28b1389..772f0f7 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -815,7 +815,7 @@ func (s *Strategy) scheduleDeferredMirrorRestore(ctx context.Context, repo *gitc if s.config.SnapshotInterval > 0 { s.scheduleSnapshotJobs(repo) } - if s.config.RepackInterval > 0 { + if s.repackEnabled() { s.scheduleRepackJobs(repo) } return nil