Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from instana.options import StandardOptions
from instana.util import to_json
from instana.util.runtime import get_py_source, log_runtime_env_info
from instana.util.span_utils import get_operation_specifiers
from instana.util.span_utils import matches_rule
from instana.version import VERSION

if TYPE_CHECKING:
Expand Down Expand Up @@ -357,51 +357,58 @@ def report_spans(self, payload: Dict[str, Any]) -> Optional[Response]:

def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filters given span list using ignore-endpoint variable and returns the list of filtered spans.
Filters span list using new hierarchical filtering rules.
"""
filtered_spans = []
endpoint = ""

for span in spans:
if (hasattr(span, "n") or hasattr(span, "name")) and hasattr(span, "data"):
service = span.n
operation_specifier_key, service_specifier_key = (
get_operation_specifiers(service)
)
if service == "kafka":
endpoint = span.data[service][service_specifier_key]
method = span.data[service][operation_specifier_key]
if isinstance(method, str) and self.__is_endpoint_ignored(
service, method, endpoint
):
continue
else:
filtered_spans.append(span)
else:
if not (hasattr(span, "n") or hasattr(span, "name")) or not hasattr(
span, "data"
):
filtered_spans.append(span)
continue

service_name = ""

# Set the service name
for span_value in span.data.keys():
if isinstance(span.data[span_value], dict):
service_name = span_value

# Set span attributes for filtering
attributes_to_check = {
"type": service_name,
"kind": span.k,
}

# Add operation specifiers to the attributes
for key, value in span.data[service_name].items():
attributes_to_check[f"{service_name}.{key}"] = value

# Check if the span need to be ignored
if self.__is_endpoint_ignored(attributes_to_check):
continue

filtered_spans.append(span)

return filtered_spans

def __is_endpoint_ignored(
self,
service: str,
method: str = "",
endpoint: str = "",
) -> bool:
"""Check if the given service and endpoint combination should be ignored."""
service = service.lower()
method = method.lower()
endpoint = endpoint.lower()
filter_rules = [
f"{service}.{method}", # service.method
f"{service}.*", # service.*
]

if service == "kafka" and endpoint:
filter_rules += [
f"{service}.{method}.{endpoint}", # service.method.endpoint
f"{service}.*.{endpoint}", # service.*.endpoint
f"{service}.{method}.*", # service.method.*
]
return any(rule in self.options.ignore_endpoints for rule in filter_rules)
def __is_endpoint_ignored(self, span_attributes: dict) -> bool:
filters = self.options.span_filters
if not filters:
return False

# Check include rules
for rule in filters.get("include", [{}]):
if matches_rule(rule.get("attributes", []), span_attributes):
return False

# Check exclude rules
for rule in filters.get("exclude", [{}]):
if matches_rule(rule.get("attributes", []), span_attributes):
return True

return False

def handle_agent_tasks(self, task: Dict[str, Any]) -> None:
"""
Expand Down
21 changes: 15 additions & 6 deletions src/instana/instrumentation/kafka/confluent_kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ def trace_kafka_produce(
# Get the topic from either args or kwargs
topic = args[0] if args else kwargs.get("topic", "")

attributes_to_check = {
"type": "kafka",
"kind": "exit",
"kafka.service": topic,
"kafka.access": "produce",
}

is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
"produce",
topic,
attributes_to_check
)

with tracer.start_as_current_span(
Expand Down Expand Up @@ -137,10 +142,14 @@ def create_span(
is_suppressed = False

if topic:
attributes_to_check = {
"type": "kafka",
"kind": "entry",
"kafka.service": topic,
"kafka.access": span_type,
}
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
span_type,
topic,
attributes_to_check
)

if not is_suppressed and headers:
Expand Down
25 changes: 18 additions & 7 deletions src/instana/instrumentation/kafka/kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,26 @@ def trace_kafka_send(

# Get the topic from either args or kwargs
topic = args[0] if args else kwargs.get("topic", "")
attributes_to_check = {
"type": "kafka",
"kind": "exit",
"kafka.service": topic,
"kafka.access": "send",
}

is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
"send",
topic,
attributes_to_check
)

with tracer.start_as_current_span(
"kafka-producer", span_context=parent_context, kind=SpanKind.PRODUCER
) as span:
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", "send")

# context propagation
# Context propagation
headers = kwargs.get("headers", [])

if not is_suppressed and ("x_instana_l_s", b"0") in headers:
is_suppressed = True

Expand All @@ -70,6 +76,7 @@ def trace_kafka_send(

if tracer.exporter.options.kafka_trace_correlation:
kwargs["headers"] = headers

try:
res = wrapped(*args, **kwargs)
return res
Expand All @@ -94,10 +101,14 @@ def create_span(

is_suppressed = False
if topic:
attributes_to_check = {
"type": "kafka",
"kind": "entry",
"kafka.service": topic,
"kafka.access": span_type,
}
is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored(
"kafka",
span_type,
topic,
attributes_to_check
)

if not is_suppressed and headers:
Expand Down
Loading
Loading