Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ cachew git restore https://github.com/org/repo ./repo

```hcl
git {
snapshot-interval = "1h"
repack-interval = "1h"
snapshot-interval = "1h"
repack-interval = "1h"
# Full repack is expensive (re-deltas every object) and only needs to run
# occasionally; the frequent geometric repack absorbs day-to-day churn.
full-repack-interval = "168h"
}
```

Expand Down Expand Up @@ -275,8 +278,9 @@ github-app {
git-clone {}

git {
snapshot-interval = "1h"
repack-interval = "1h"
snapshot-interval = "1h"
repack-interval = "1h"
full-repack-interval = "168h"
}

github-releases {
Expand Down
62 changes: 55 additions & 7 deletions internal/gitclone/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,17 @@ type Config struct {
LsRemoteTimeout time.Duration `hcl:"ls-remote-timeout,optional" help:"Upper bound for 'git ls-remote' so a slow upstream cannot block the request path indefinitely." default:"1m"`
RepackTimeout time.Duration `hcl:"repack-timeout,optional" help:"Upper bound for 'git repack' so a slow repack on a large repository cannot block the scheduler queue indefinitely." default:"10m"`
RepackThreads int `hcl:"repack-threads,optional" help:"Threads for git repack operations. Limits memory since windowMemory and deltaCacheSize are per-thread. 0 = pack-threads." default:"4"`

FullRepackTimeout time.Duration `hcl:"full-repack-timeout,optional" help:"Upper bound for the full (delta-recomputing) repack, which is far slower than the geometric repack on large repositories. Size it to complete the largest mirror, since a timeout kills the job and wastes the work; it also holds a scheduler slot for its whole duration, so pair it with a slow full-repack-interval. 0 falls back to repack-timeout." default:"6h"`
}

// Delta search window and chain depth for the full repack. A wider window finds
// tighter deltas (smaller packs) at the cost of more CPU.
const (
fullRepackWindow = 100
fullRepackDepth = 50
)

// CredentialProvider provides credentials for git operations.
type CredentialProvider interface {
GetTokenForURL(ctx context.Context, url string) (string, error)
Expand Down Expand Up @@ -807,8 +816,46 @@ func (r *Repository) GetUpstreamRefs(ctx context.Context) (map[string]string, er
func (r *Repository) Repack(ctx context.Context) error {
logger := logging.FromContext(ctx)
logger.InfoContext(ctx, "Geometric repack started", "upstream", r.upstreamURL)
if err := r.runRepack(ctx, r.config.RepackTimeout,
"-d", "--geometric=2", "--write-midx", "--write-bitmap-index"); err != nil {
return err
}
logger.InfoContext(ctx, "Geometric repack completed", "upstream", r.upstreamURL)
return nil
}

// RepackFull runs a full repack that re-selects deltas across all objects with
// a wide search window (-a -d -f --window --depth). Unlike the geometric
// Repack, which reuses existing deltas and only consolidates packs, this
// recovers the cross-pack redundancy that accumulates from incremental fetches
// — materially shrinking the mirror, and therefore the snapshots derived from
// it, at the cost of significant one-time CPU. It is meant to run on a slow
// cadence in between the frequent geometric repacks.
func (r *Repository) RepackFull(ctx context.Context) error {
logger := logging.FromContext(ctx)

timeout := r.config.FullRepackTimeout
if timeout <= 0 {
timeout = r.config.RepackTimeout
}

repackCtx, cancel := context.WithTimeout(ctx, r.config.RepackTimeout)
logger.InfoContext(ctx, "Full repack started", "upstream", r.upstreamURL, "window", fullRepackWindow, "depth", fullRepackDepth)
if err := r.runRepack(ctx, timeout,
"-a", "-d", "-f",
"--window="+strconv.Itoa(fullRepackWindow), "--depth="+strconv.Itoa(fullRepackDepth),
"--write-midx", "--write-bitmap-index"); err != nil {
return err
}
logger.InfoContext(ctx, "Full repack completed", "upstream", r.upstreamURL)
return nil
}

// runRepack executes "git repack <args>" with bounded threads/memory and a
// timeout, cleaning up a stale multi-pack-index.lock on failure.
func (r *Repository) runRepack(ctx context.Context, timeout time.Duration, repackArgs ...string) error {
logger := logging.FromContext(ctx)

repackCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

threads := r.config.RepackThreads
Expand All @@ -820,12 +867,15 @@ func (r *Repository) Repack(ctx context.Context) error {
// config uses high values (512m deltaCacheSize, 1g windowMemory) tuned for
// serving performance with many threads. Repack is a background task that
// can afford to be slower in exchange for bounded memory.
// #nosec G204 - r.path is controlled by us
cmd := exec.CommandContext(repackCtx, "git", "-C", r.path,
"-c", "pack.threads="+strconv.Itoa(threads),
args := []string{"-C", r.path,
"-c", "pack.threads=" + strconv.Itoa(threads),
"-c", "pack.windowMemory=256m",
"-c", "pack.deltaCacheSize=128m",
"repack", "-d", "--geometric=2", "--write-midx", "--write-bitmap-index")
"repack"}
args = append(args, repackArgs...)

// #nosec G204 - r.path is controlled by us
cmd := exec.CommandContext(repackCtx, "git", args...)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Cancel = func() error {
return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
Expand All @@ -842,8 +892,6 @@ func (r *Repository) Repack(ctx context.Context) error {
}
return errors.Wrapf(err, "git repack: %s", string(output))
}

logger.InfoContext(ctx, "Geometric repack completed", "upstream", r.upstreamURL)
return nil
}

Expand Down
29 changes: 29 additions & 0 deletions internal/gitclone/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,35 @@ func TestRepository_Repack(t *testing.T) {
assert.NoError(t, err)
}

func TestRepository_RepackFull(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError})
tmpDir := t.TempDir()
upstreamPath := createBareRepo(t, tmpDir)

clonePath := filepath.Join(tmpDir, "mirror")
cmd := exec.Command("git", "clone", "--mirror", upstreamPath, clonePath)
assert.NoError(t, cmd.Run())

repo := &Repository{
state: StateReady,
config: testRepoConfig(),
path: clonePath,
upstreamURL: upstreamPath,
fetchSem: make(chan struct{}, 1),
}
repo.fetchSem <- struct{}{}

// Unset full-repack-timeout exercises the fallback to repack-timeout.
assert.NoError(t, repo.RepackFull(ctx))

packs, err := filepath.Glob(filepath.Join(clonePath, "objects", "pack", "*.pack"))
assert.NoError(t, err)
assert.True(t, len(packs) > 0, "expected at least one pack file after full repack")

_, err = os.Stat(filepath.Join(clonePath, "objects", "pack", "multi-pack-index"))
assert.NoError(t, err)
}

func TestRepository_Repack_CleansUpStaleLockOnFailure(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelError})
tmpDir := t.TempDir()
Expand Down
9 changes: 5 additions & 4 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func Register(r *strategy.Registry, scheduler jobscheduler.Provider, cloneManage
type Config struct {
SnapshotInterval time.Duration `hcl:"snapshot-interval,optional" help:"How often to generate tar.zstd workstation snapshots. 0 disables snapshots." default:"0"`
MirrorSnapshotInterval time.Duration `hcl:"mirror-snapshot-interval,optional" help:"How often to generate mirror snapshots for pod bootstrap. 0 uses snapshot-interval. Defaults to 2h." default:"2h"`
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run the geometric repack (consolidates packs, reuses deltas). 0 disables." default:"0"`
FullRepackInterval time.Duration `hcl:"full-repack-interval,optional" help:"How often to run the full repack (re-selects deltas across all objects, shrinking the mirror). Far more expensive than the geometric repack, so use a slow cadence. 0 disables." default:"0"`
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"`
BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"`
}
Expand Down Expand Up @@ -247,7 +248,7 @@ func (s *Strategy) warmExistingRepos(ctx context.Context) error {
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
if s.repackEnabled() {
s.scheduleRepackJobs(repo)
}
}
Expand Down Expand Up @@ -618,7 +619,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) (r
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
if s.repackEnabled() {
s.scheduleRepackJobs(repo)
}
return nil
Expand Down Expand Up @@ -648,7 +649,7 @@ func (s *Strategy) startClone(ctx context.Context, repo *gitclone.Repository) (r
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
if s.repackEnabled() {
s.scheduleRepackJobs(repo)
}
return nil
Expand Down
35 changes: 26 additions & 9 deletions internal/strategy/git/repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,29 @@ import (
"github.com/block/cachew/internal/gitclone"
)

// repackEnabled reports whether any repack variant is configured to run.
func (s *Strategy) repackEnabled() bool {
return s.config.RepackInterval > 0 || s.config.FullRepackInterval > 0
}

func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), "repack-periodic", s.config.RepackInterval, func(ctx context.Context) (returnErr error) {
if s.config.RepackInterval > 0 {
s.schedulePeriodicRepack(repo, "repack-periodic", "repack", s.config.RepackInterval, repo.Repack)
}
if s.config.FullRepackInterval > 0 {
s.schedulePeriodicRepack(repo, "repack-full-periodic", "repack_full", s.config.FullRepackInterval, repo.RepackFull)
}
}

// schedulePeriodicRepack runs repack on the given interval, recording the
// before/after pack count, duration, and outcome. operation distinguishes the
// geometric ("repack") and full ("repack_full") variants in metrics and traces.
func (s *Strategy) schedulePeriodicRepack(repo *gitclone.Repository, jobID, operation string, interval time.Duration, repack func(context.Context) error) {
s.scheduler.SubmitPeriodicJob(repo.UpstreamURL(), jobID, interval, func(ctx context.Context) (returnErr error) {
upstream := repo.UpstreamURL()
ctx, span := tracer.Start(ctx, "git.repack",
ctx, span := tracer.Start(ctx, "git."+operation,
trace.WithAttributes(
attribute.String("cachew.operation", "repack"),
attribute.String("cachew.operation", operation),
attribute.String("cachew.upstream", upstream),
),
)
Expand All @@ -32,28 +49,28 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
span.End()
}()

// Pack count before and after gives us a direct view of how much
// the geometric repack actually consolidated. A flat before/after
// ratio over time means fragmentation is outpacing the schedule.
// Pack count before and after gives us a direct view of how much the
// repack consolidated. A flat before/after ratio over time means
// fragmentation is outpacing the schedule.
if before, err := countPackFiles(repo.Path()); err == nil {
s.metrics.recordRepackPackCount(ctx, upstream, "before", before)
span.SetAttributes(attribute.Int("cachew.pack_count_before", before))
}

start := time.Now()
err := repo.Repack(ctx)
err := repack(ctx)
status := "success"
if err != nil {
status = "error"
}
s.metrics.recordOperation(ctx, "repack", status, time.Since(start))
s.metrics.recordOperation(ctx, operation, status, time.Since(start))

if after, countErr := countPackFiles(repo.Path()); countErr == nil {
s.metrics.recordRepackPackCount(ctx, upstream, "after", after)
span.SetAttributes(attribute.Int("cachew.pack_count_after", after))
}

return errors.Wrap(err, "repack")
return errors.Wrap(err, operation)
})
}

Expand Down
22 changes: 16 additions & 6 deletions internal/strategy/git/repack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@ func TestRepackInterval(t *testing.T) {
tmpDir := t.TempDir()

tests := []struct {
name string
repackInterval time.Duration
name string
repackInterval time.Duration
fullRepackInterval time.Duration
}{
{
name: "Enabled",
name: "GeometricEnabled",
repackInterval: 24 * time.Hour,
},
{
name: "Disabled",
repackInterval: 0,
name: "FullEnabled",
fullRepackInterval: 7 * 24 * time.Hour,
},
{
name: "BothEnabled",
repackInterval: 24 * time.Hour,
fullRepackInterval: 7 * 24 * time.Hour,
},
{
name: "Disabled",
},
}

Expand All @@ -40,7 +49,8 @@ func TestRepackInterval(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, tt.name),
}, nil)
s, err := git.New(ctx, git.Config{
RepackInterval: tt.repackInterval,
RepackInterval: tt.repackInterval,
FullRepackInterval: tt.fullRepackInterval,
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)
assert.True(t, s != nil)
Expand Down
2 changes: 1 addition & 1 deletion internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func (s *Strategy) scheduleDeferredMirrorRestore(ctx context.Context, repo *gitc
if s.config.SnapshotInterval > 0 {
s.scheduleSnapshotJobs(repo)
}
if s.config.RepackInterval > 0 {
if s.repackEnabled() {
s.scheduleRepackJobs(repo)
}
return nil
Expand Down