Skip to content

Commit 1c7b2a1

Browse files
committed
Add close() method to ReorgAwareStream and remove unnecessary hasattr checks
- Add explicit close() method to ReorgAwareStream for proper stream cleanup on reorg restart (fixes resource leak) - Remove hasattr check in client.py since both StreamingResultIterator and ReorgAwareStream have close()
1 parent 1a2a6f7 commit 1c7b2a1

File tree

2 files changed

+5
-4
lines changed

2 files changed

+5
-4
lines changed

src/amp/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -838,9 +838,7 @@ def query_and_load_streaming(
838838

839839
# Check if we need to restart due to reorg
840840
if reorg_result:
841-
# Close the old stream before restarting
842-
if hasattr(stream_iterator, 'close'):
843-
stream_iterator.close()
841+
stream_iterator.close()
844842
self.logger.info('Reorg detected, restarting stream with new resume position...')
845843
resume_watermark = loader_instance.state_store.get_resume_position(connection_name, destination)
846844
continue

src/amp/streaming/reorg.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,15 @@ def _is_duplicate_batch(self, current_ranges: List[BlockRange]) -> bool:
176176
# All ranges are exact duplicates
177177
return True
178178

179+
def close(self) -> None:
180+
"""Close the underlying stream"""
181+
self.stream_iterator.close()
182+
179183
def __enter__(self) -> 'ReorgAwareStream':
180184
"""Context manager entry"""
181185
return self
182186

183187
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
184188
"""Context manager exit"""
185-
# Delegate to underlying stream
186189
if hasattr(self.stream_iterator, 'close'):
187190
self.stream_iterator.close()

0 commit comments

Comments
 (0)