Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
bcdc8a5
Fix #729 and #731: Telemetry lifecycle management
msrathore-db Jan 29, 2026
471a551
Address review comments: revert timeout and telemetry_enabled changes
msrathore-db Feb 5, 2026
2a1e6c9
Fix Black formatting violations
msrathore-db Feb 5, 2026
4b2da91
Fix CI test failure: Prevent parallel execution of telemetry tests
msrathore-db Feb 5, 2026
8b5a402
Fix telemetry test fixtures: Clean up state before AND after tests
msrathore-db Feb 6, 2026
93c4004
Fix CI test failure: Clear _flush_event between tests
msrathore-db Feb 6, 2026
69f4882
Add debug workflow and output to diagnose CI test failure
msrathore-db Feb 6, 2026
c558fae
Fix workflow: Add krb5 system dependency
msrathore-db Feb 6, 2026
a62073f
Fix xdist_group: Add --dist=loadgroup to pytest commands
msrathore-db Feb 6, 2026
df99e7f
Add aggressive flush before test to prevent event interference
msrathore-db Feb 6, 2026
fa3cbd2
Split workflow: Isolate telemetry tests in separate job
msrathore-db Feb 6, 2026
54fab16
Fix workflows: Add krb5 deps and cleanup debug code
msrathore-db Feb 6, 2026
489ffd4
Fix publish-test.yml: Update Python 3.9 -> 3.10
msrathore-db Feb 6, 2026
1800ad1
Fix integration workflow: Remove --dist=loadgroup from non-telemetry …
msrathore-db Feb 6, 2026
a514ca2
Fix code-coverage workflow: Remove test_telemetry_e2e.py from coverag…
msrathore-db Feb 6, 2026
0c01ba9
Fix publish-test workflow: Remove cache conditional
msrathore-db Feb 6, 2026
74ea9cf
Merge remote-tracking branch 'origin/main' into fix/telemetry-lifecyc…
msrathore-db Feb 6, 2026
649a41d
Fix publish-test.yml: Remove duplicate krb5 install, restore cache co…
msrathore-db Feb 6, 2026
162302e
Fix code-coverage: Remove serial tests step
msrathore-db Feb 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .github/workflows/code-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ jobs:
with:
python-version: "3.10"
#----------------------------------------------
# ----- install system dependencies -----
#----------------------------------------------
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libkrb5-dev
#----------------------------------------------
# ----- install & configure poetry -----
#----------------------------------------------
- name: Install Poetry
Expand Down Expand Up @@ -80,13 +87,13 @@ jobs:
-v

#----------------------------------------------
# run serial tests with coverage
# run telemetry tests with coverage (isolated)
#----------------------------------------------
- name: Run serial tests with coverage
- name: Run telemetry tests with coverage (isolated)
continue-on-error: false
run: |
poetry run pytest tests/e2e \
-m "serial" \
# Run test_concurrent_telemetry.py separately for isolation
poetry run pytest tests/e2e/test_concurrent_telemetry.py \
--cov=src \
--cov-append \
--cov-report=xml \
Expand Down
52 changes: 48 additions & 4 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
pull_request:

jobs:
run-e2e-tests:
run-non-telemetry-tests:
runs-on: ubuntu-latest
environment: azure-prod
env:
Expand Down Expand Up @@ -59,9 +59,53 @@ jobs:
#----------------------------------------------
# run test suite
#----------------------------------------------
- name: Run e2e tests (excluding daily-only tests)
- name: Run non-telemetry e2e tests
run: |
# Exclude telemetry E2E tests from PR runs (run daily instead)
# Exclude all telemetry tests - they run in separate job for isolation
poetry run python -m pytest tests/e2e \
--ignore=tests/e2e/test_telemetry_e2e.py \
-n auto
--ignore=tests/e2e/test_concurrent_telemetry.py \
-n auto

run-telemetry-tests:
runs-on: ubuntu-latest
needs: run-non-telemetry-tests # Run after non-telemetry tests complete
environment: azure-prod
env:
DATABRICKS_SERVER_HOSTNAME: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_CATALOG: peco
DATABRICKS_USER: ${{ secrets.TEST_PECO_SP_ID }}
steps:
- name: Check out repository
uses: actions/checkout@v4
- name: Set up python
id: setup-python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libkrb5-dev
- name: Install Poetry
uses: snok/install-poetry@v1
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }}
- name: Install dependencies
run: poetry install --no-interaction --all-extras
- name: Run telemetry tests in isolation
run: |
# Run test_concurrent_telemetry.py in isolation with complete process separation
# Use --dist=loadgroup to respect @pytest.mark.xdist_group markers
poetry run python -m pytest tests/e2e/test_concurrent_telemetry.py \
-n auto --dist=loadgroup -v
2 changes: 1 addition & 1 deletion .github/workflows/publish-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
id: setup-python
uses: actions/setup-python@v5
with:
python-version: 3.9
python-version: "3.10"
#----------------------------------------------
# ----- install & configure poetry -----
#----------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,8 @@ def read(self) -> Optional[OAuthToken]:
)
self.session.open()
except Exception as e:
# Respect user's telemetry preference even during connection failure
enable_telemetry = kwargs.get("enable_telemetry", True)
TelemetryClientFactory.connection_failure_log(
error_name="Exception",
error_message=str(e),
Expand All @@ -316,6 +318,7 @@ def read(self) -> Optional[OAuthToken]:
user_agent=self.session.useragent_header
if hasattr(self, "session")
else None,
enable_telemetry=enable_telemetry,
)
raise e

Expand Down
12 changes: 10 additions & 2 deletions src/databricks/sql/common/unified_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,15 @@ def _should_use_proxy(self, target_host: str) -> bool:
logger.debug("Error checking proxy bypass for host %s: %s", target_host, e)
return True

def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager:
def _get_pool_manager_for_url(self, url: str) -> Optional[urllib3.PoolManager]:
"""
Get the appropriate pool manager for the given URL.

Args:
url: The target URL

Returns:
PoolManager instance (either direct or proxy)
PoolManager instance (either direct or proxy), or None if client is closed
"""
parsed_url = urllib.parse.urlparse(url)
target_host = parsed_url.hostname
Expand Down Expand Up @@ -291,6 +291,14 @@ def request_context(
# Select appropriate pool manager based on target URL
pool_manager = self._get_pool_manager_for_url(url)

# DEFENSIVE: Check if pool_manager is None (client closing/closed)
# This prevents AttributeError race condition when telemetry cleanup happens
if pool_manager is None:
logger.debug(
"HTTP client closing or closed, cannot make request to %s", url
)
raise RequestError("HTTP client is closing or has been closed")

response = None

try:
Expand Down
40 changes: 38 additions & 2 deletions src/databricks/sql/telemetry/telemetry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
from databricks.sql.common.unified_http_client import UnifiedHttpClient
from databricks.sql.common.http import HttpMethod
from databricks.sql.exc import RequestError
from databricks.sql.telemetry.telemetry_push_client import (
ITelemetryPushClient,
TelemetryPushClient,
Expand Down Expand Up @@ -417,10 +418,38 @@ def export_latency_log(
)

def close(self):
"""Flush remaining events before closing"""
"""Flush remaining events before closing

IMPORTANT: This method does NOT close self._http_client.

Rationale:
- _flush() submits async work to the executor that uses _http_client
- If we closed _http_client here, async callbacks would fail with AttributeError
- Instead, we let _http_client live as long as needed:
* Pending futures hold references to self (via bound methods)
* This keeps self alive, which keeps self._http_client alive
* When all futures complete, Python GC will clean up naturally
- The __del__ method ensures eventual cleanup during garbage collection

This design prevents race conditions while keeping telemetry truly async.
"""
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
self._flush()

def __del__(self):
"""Cleanup when TelemetryClient is garbage collected

This ensures _http_client is eventually closed when the TelemetryClient
object is destroyed. By this point, all async work should be complete
(since the futures held references keeping us alive), so it's safe to
close the http client.
"""
try:
if hasattr(self, "_http_client") and self._http_client:
self._http_client.close()
except Exception:
pass


class _TelemetryClientHolder:
"""
Expand Down Expand Up @@ -674,7 +703,8 @@ def close(host_url):
)
try:
TelemetryClientFactory._stop_flush_thread()
TelemetryClientFactory._executor.shutdown(wait=True)
# Use wait=False to allow process to exit immediately
TelemetryClientFactory._executor.shutdown(wait=False)
except Exception as e:
logger.debug("Failed to shutdown thread pool executor: %s", e)
TelemetryClientFactory._executor = None
Expand All @@ -689,9 +719,15 @@ def connection_failure_log(
port: int,
client_context,
user_agent: Optional[str] = None,
enable_telemetry: bool = True,
):
"""Send error telemetry when connection creation fails, using provided client context"""

# Respect user's telemetry preference - don't force-enable
if not enable_telemetry:
logger.debug("Telemetry disabled, skipping connection failure log")
return

UNAUTH_DUMMY_SESSION_ID = "unauth_session_id"

TelemetryClientFactory.initialize_telemetry_client(
Expand Down
26 changes: 25 additions & 1 deletion tests/e2e/test_concurrent_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def run_in_threads(target, num_threads, pass_index=False):


@pytest.mark.serial
@pytest.mark.xdist_group(name="serial_telemetry")
class TestE2ETelemetry(PySQLPytestTestCase):
@pytest.fixture(autouse=True)
def telemetry_setup_teardown(self):
Expand All @@ -35,13 +36,27 @@ def telemetry_setup_teardown(self):
before each test and shuts it down afterward. Using a fixture makes
this robust and automatic.
"""
# Clean up BEFORE test starts to ensure no leftover state from previous tests
# Use wait=True to ensure all pending telemetry from previous tests completes
# This prevents those events from being captured by this test's mock
if TelemetryClientFactory._executor:
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for pending telemetry
TelemetryClientFactory._executor = None
TelemetryClientFactory._stop_flush_thread()
TelemetryClientFactory._flush_event.clear() # Clear the event flag
TelemetryClientFactory._clients.clear()
TelemetryClientFactory._initialized = False

try:
yield
finally:
# Clean up AFTER test ends
# Use wait=True to ensure this test's telemetry completes before next test starts
if TelemetryClientFactory._executor:
TelemetryClientFactory._executor.shutdown(wait=True)
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for this test's telemetry
TelemetryClientFactory._executor = None
TelemetryClientFactory._stop_flush_thread()
TelemetryClientFactory._flush_event.clear() # Clear the event flag
TelemetryClientFactory._clients.clear()
TelemetryClientFactory._initialized = False

Expand All @@ -50,6 +65,14 @@ def test_concurrent_queries_sends_telemetry(self):
An E2E test where concurrent threads execute real queries against
the staging endpoint, while we capture and verify the generated telemetry.
"""
# Extra flush right before test starts to clear any events that accumulated
# between fixture cleanup and now (e.g., from other tests on same worker)
if TelemetryClientFactory._executor:
TelemetryClientFactory._executor.shutdown(wait=True)
TelemetryClientFactory._executor = None
TelemetryClientFactory._clients.clear()
TelemetryClientFactory._initialized = False

num_threads = 30
capture_lock = threading.Lock()
captured_telemetry = []
Expand Down Expand Up @@ -139,6 +162,7 @@ def execute_query_worker(thread_id):
assert "errors" not in response or not response["errors"]
if "numProtoSuccess" in response:
total_successful_events += response["numProtoSuccess"]

assert total_successful_events == num_threads * 2

assert (
Expand Down
28 changes: 25 additions & 3 deletions tests/e2e/test_telemetry_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,45 @@ def connection(self, extra_params=()):


@pytest.mark.serial
@pytest.mark.xdist_group(name="serial_telemetry")
class TestTelemetryE2E(TelemetryTestBase):
"""E2E tests for telemetry scenarios - must run serially due to shared host-level telemetry client"""

@pytest.fixture(autouse=True)
def telemetry_setup_teardown(self):
"""Clean up telemetry client state before and after each test"""
# Clean up BEFORE test starts
# Use wait=True to ensure all pending telemetry from previous tests completes
if TelemetryClientFactory._executor:
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for pending telemetry
TelemetryClientFactory._executor = None
TelemetryClientFactory._stop_flush_thread()
TelemetryClientFactory._flush_event.clear() # Clear the event flag
TelemetryClientFactory._clients.clear()
TelemetryClientFactory._initialized = False

# Clear feature flags cache before test starts
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
with FeatureFlagsContextFactory._lock:
FeatureFlagsContextFactory._context_map.clear()
if FeatureFlagsContextFactory._executor:
FeatureFlagsContextFactory._executor.shutdown(wait=False)
FeatureFlagsContextFactory._executor = None

try:
yield
finally:
# Clean up AFTER test ends
# Use wait=True to ensure this test's telemetry completes
if TelemetryClientFactory._executor:
TelemetryClientFactory._executor.shutdown(wait=True)
TelemetryClientFactory._executor.shutdown(wait=True) # WAIT for this test's telemetry
TelemetryClientFactory._executor = None
TelemetryClientFactory._stop_flush_thread()
TelemetryClientFactory._flush_event.clear() # Clear the event flag
TelemetryClientFactory._clients.clear()
TelemetryClientFactory._initialized = False

# Clear feature flags cache to prevent state leakage between tests
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
# Clear feature flags cache after test ends
with FeatureFlagsContextFactory._lock:
FeatureFlagsContextFactory._context_map.clear()
if FeatureFlagsContextFactory._executor:
Expand Down
Loading