diff --git a/lib/connect_client.go b/lib/connect_client.go index 42d56c0..f0ff36a 100644 --- a/lib/connect_client.go +++ b/lib/connect_client.go @@ -152,9 +152,13 @@ func (p connectClient) Read(ctx context.Context, logger DatabaseLogger, ps Plane } if err != nil { if s, ok := status.FromError(err); ok { - // if the error is anything other than server timeout, keep going + // codes.DeadlineExceeded is a server-side timeout and is safe to retry + // from the last checkpointed cursor. codes.Canceled means the client's + // own context was canceled (e.g., Fivetran requesting shutdown) — the + // parent context is already done, so retrying would just burn through + // backoff sleeps without making progress. Treat it as non-retryable. if s.Code() != codes.DeadlineExceeded { - logger.Warning(fmt.Sprintf("%vGot error [%v] with message [%q], Returning with cursor :[%v] after non-timeout error", preamble, s.Code(), err, currentPosition)) + logger.Warning(fmt.Sprintf("%vGot error [%v] with message [%q], Returning with cursor :[%v] after non-retryable error", preamble, s.Code(), err, currentPosition)) // Check for binlog expiration error and reset cursor for historical sync if IsBinlogsExpirationError(err) { @@ -291,13 +295,34 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam // stop when we've reached the well known stop position for this sync session. watchForVgGtidChange := false + // lastSafeTC tracks the last cursor position where rows were confirmed written to + // the destination. VStream delivers rows and their VGTID in the same SyncResponse, + // so tc and lastSafeTC only diverge on cursor-only responses (heartbeats, DDL events, + // or transactions for unrelated tables that carry no rows for this table). On any + // mid-stream error, returning lastSafeTC avoids checkpointing a cursor that has no + // corresponding rows in the destination, forcing a safe re-stream from the last + // confirmed write position. + lastSafeTC := tc for { res, err := c.Recv() if err != nil { - return tc, err + if errors.Is(err, io.EOF) { + // Natural end of stream: all rows up to tc have been delivered. + return tc, err + } + // Mid-stream error (e.g. context canceled): return the last cursor + // where rows were confirmed written, not the potentially advanced tc. + return lastSafeTC, err } + // Determine whether this response carries any row data before processing. + // Cursor-only responses (heartbeats, DDL events, unrelated-table transactions) + // carry no rows for this table. lastSafeTC is only advanced when rows have been + // delivered, so a mid-stream error after a cursor-only response re-streams from + // the last position where rows were confirmed written. + rowsInResponse := len(res.Result) > 0 || len(res.Deletes) > 0 || len(res.Updates) > 0 + if onResult != nil { for _, insertedRow := range res.Result { qr := sqltypes.Proto3ToResult(insertedRow) @@ -307,7 +332,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam } sqlResult.Rows = append(sqlResult.Rows, row) if err := onResult(sqlResult, OpType_Insert); err != nil { - return tc, status.Error(codes.Internal, "unable to serialize row") + return lastSafeTC, status.Error(codes.Internal, "unable to serialize row") } } } @@ -320,7 +345,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam } sqlResult.Rows = append(sqlResult.Rows, row) if err := onResult(sqlResult, OpType_Delete); err != nil { - return nil, status.Error(codes.Internal, "unable to serialize row") + return lastSafeTC, status.Error(codes.Internal, "unable to serialize row") } } } @@ -333,13 +358,20 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam After: serializeQueryResult(update.After), } if err := onUpdate(updatedRow); err != nil { - return nil, status.Error(codes.Internal, "unable to serialize update") + return lastSafeTC, status.Error(codes.Internal, "unable to serialize update") } } } if res.Cursor != nil { tc = res.Cursor + // Only advance lastSafeTC when rows have been written at this cursor. + // Cursor-only responses (heartbeats, DDL, unrelated-table events) leave + // lastSafeTC unchanged so a mid-stream error replays from the last + // confirmed write rather than an intermediate cursor with no row data. + if rowsInResponse { + lastSafeTC = tc + } } // Because of the ordering of events in a vstream