Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions cassandra/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,24 @@
import re
import struct
import time

# Regex to detect LWT (Lightweight Transaction) queries in CQL strings.
# Matches: INSERT ... IF NOT EXISTS, UPDATE/DELETE ... IF EXISTS,
# and conditional updates/deletes (e.g. UPDATE ... IF col = ...,
# UPDATE ... IF "col" = ...).
# Uses word boundaries and case-insensitive matching.
# This is a best-effort heuristic for SimpleStatement; PreparedStatement
# gets the authoritative is_lwt flag from the server.
_LWT_PATTERN = re.compile(
r'\bIF\s+(?:NOT\s+)?EXISTS\b' # IF [NOT] EXISTS
r'|\bIF\s+[a-zA-Z_"]', # IF <column_name> or IF "<column_name>" (conditional)
re.IGNORECASE
)

# DML verbs that can be LWT queries. Only these statement types can contain
# Paxos/LWT clauses. DDL statements (CREATE/ALTER/DROP) also use IF [NOT] EXISTS
# but those are not LWT operations.
_LWT_DML_VERBS = frozenset({'INSERT', 'UPDATE', 'DELETE', 'BEGIN'})
import warnings

from cassandra import ConsistencyLevel, OperationTimedOut
Expand Down Expand Up @@ -416,6 +434,32 @@ def __str__(self):
(self.query_string, consistency))
__repr__ = __str__

def is_lwt(self):
"""
Detect whether this query is a Lightweight Transaction (LWT) by
inspecting the query string for ``IF [NOT] EXISTS`` or ``IF <condition>``
clauses in DML statements (INSERT, UPDATE, DELETE, BEGIN BATCH).

DDL statements like ``CREATE TABLE IF NOT EXISTS`` are excluded.

This is a best-effort heuristic. For authoritative LWT detection,
use :class:`.PreparedStatement` which gets the ``is_lwt`` flag from
the server during PREPARE.

The result is cached after the first call.
"""
try:
return self._cached_is_lwt
except AttributeError:
# Quick check: only DML statements can be LWT
query = self._query_string.lstrip()
first_word = query.split(None, 1)[0].upper() if query else ''
if first_word not in _LWT_DML_VERBS:
self._cached_is_lwt = False
else:
self._cached_is_lwt = bool(_LWT_PATTERN.search(query))
return self._cached_is_lwt


class PreparedStatement(object):
"""
Expand Down
177 changes: 168 additions & 9 deletions tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,174 @@ def test_is_lwt_propagates_from_statements(self):
batch_with_bound.add(bound_lwt)
assert batch_with_bound.is_lwt() is True

class LwtSimpleStatement(SimpleStatement):
def __init__(self):
super(LwtSimpleStatement, self).__init__(
"INSERT INTO test.table (id) VALUES (2) IF NOT EXISTS"
)

def is_lwt(self):
return True
# SimpleStatement now detects LWT from query string (no subclass needed)
lwt_simple = SimpleStatement(
"INSERT INTO test.table (id) VALUES (2) IF NOT EXISTS"
)
assert lwt_simple.is_lwt() is True

batch_with_simple = BatchStatement()
batch_with_simple.add(LwtSimpleStatement())
batch_with_simple.add(lwt_simple)
assert batch_with_simple.is_lwt() is True

class SimpleStatementIsLwtTest(unittest.TestCase):
"""Tests for SimpleStatement.is_lwt() CQL-based LWT detection."""

# --- INSERT IF NOT EXISTS ---

def test_insert_if_not_exists(self):
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS")
assert s.is_lwt() is True

def test_insert_if_not_exists_lowercase(self):
s = SimpleStatement("insert into ks.t (a) values (1) if not exists")
assert s.is_lwt() is True

def test_insert_if_not_exists_mixed_case(self):
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) If Not Exists")
assert s.is_lwt() is True

# --- UPDATE IF EXISTS ---

def test_update_if_exists(self):
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF EXISTS")
assert s.is_lwt() is True

# --- DELETE IF EXISTS ---

def test_delete_if_exists(self):
s = SimpleStatement("DELETE FROM ks.t WHERE k=1 IF EXISTS")
assert s.is_lwt() is True

# --- Conditional UPDATE (IF <column> = <value>) ---

def test_conditional_update_equals(self):
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a = 2")
assert s.is_lwt() is True

def test_conditional_update_not_equals(self):
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a != 2")
assert s.is_lwt() is True

def test_conditional_update_greater_than(self):
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF a > 2")
assert s.is_lwt() is True

def test_conditional_update_multiple_conditions(self):
s = SimpleStatement(
"UPDATE ks.t SET a=1 WHERE k=1 IF a = 2 AND b = 3")
assert s.is_lwt() is True

# --- Conditional DELETE ---

def test_conditional_delete(self):
s = SimpleStatement("DELETE FROM ks.t WHERE k=1 IF a = 2")
assert s.is_lwt() is True

# --- Non-LWT queries (should return False) ---

def test_select_not_lwt(self):
s = SimpleStatement("SELECT * FROM ks.t WHERE k=1")
assert s.is_lwt() is False

def test_insert_without_if(self):
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1)")
assert s.is_lwt() is False

def test_update_without_if(self):
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1")
assert s.is_lwt() is False

def test_delete_without_if(self):
s = SimpleStatement("DELETE FROM ks.t WHERE k=1")
assert s.is_lwt() is False

def test_create_table_with_if_not_exists(self):
"""DDL IF NOT EXISTS is correctly excluded — only DML can be LWT."""
s = SimpleStatement("CREATE TABLE IF NOT EXISTS ks.t (a int PRIMARY KEY)")
assert s.is_lwt() is False

def test_create_index_if_not_exists(self):
s = SimpleStatement("CREATE INDEX IF NOT EXISTS idx ON ks.t (a)")
assert s.is_lwt() is False

def test_create_keyspace_if_not_exists(self):
s = SimpleStatement(
"CREATE KEYSPACE IF NOT EXISTS ks WITH replication = "
"{'class': 'SimpleStrategy', 'replication_factor': 1}")
assert s.is_lwt() is False

def test_drop_table_if_exists(self):
s = SimpleStatement("DROP TABLE IF EXISTS ks.t")
assert s.is_lwt() is False

def test_alter_table_not_lwt(self):
s = SimpleStatement("ALTER TABLE ks.t ADD col int")
assert s.is_lwt() is False

# --- Caching ---

def test_result_is_cached(self):
s = SimpleStatement("INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS")
assert s.is_lwt() is True
assert s.is_lwt() is True # should use cache
assert s._cached_is_lwt is True

def test_non_lwt_result_is_cached(self):
s = SimpleStatement("SELECT * FROM ks.t")
assert s.is_lwt() is False
assert s._cached_is_lwt is False

# --- Edge cases ---

def test_multiline_query(self):
s = SimpleStatement("""
INSERT INTO ks.t (a, b)
VALUES (1, 2)
IF NOT EXISTS
""")
assert s.is_lwt() is True

def test_extra_whitespace(self):
s = SimpleStatement("UPDATE ks.t SET a=1 WHERE k=1 IF EXISTS")
assert s.is_lwt() is True

def test_tab_separated(self):
s = SimpleStatement("DELETE FROM ks.t WHERE k=1\tIF\tEXISTS")
assert s.is_lwt() is True

# --- Quoted identifiers ---

def test_conditional_with_quoted_identifier(self):
s = SimpleStatement('UPDATE ks.t SET a=1 WHERE k=1 IF "my_col" = 2')
assert s.is_lwt() is True

def test_conditional_delete_quoted_identifier(self):
s = SimpleStatement('DELETE FROM ks.t WHERE k=1 IF "Col" = 2')
assert s.is_lwt() is True

# --- BEGIN BATCH ---

def test_begin_batch_with_lwt(self):
s = SimpleStatement(
"BEGIN BATCH "
"INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS "
"APPLY BATCH")
assert s.is_lwt() is True

def test_begin_batch_without_lwt(self):
s = SimpleStatement(
"BEGIN BATCH "
"INSERT INTO ks.t (a) VALUES (1) "
"APPLY BATCH")
assert s.is_lwt() is False

# --- Leading whitespace ---

def test_leading_whitespace(self):
s = SimpleStatement(" \n INSERT INTO ks.t (a) VALUES (1) IF NOT EXISTS")
assert s.is_lwt() is True

def test_leading_whitespace_ddl(self):
s = SimpleStatement(" \n CREATE TABLE IF NOT EXISTS ks.t (a int PRIMARY KEY)")
assert s.is_lwt() is False
Loading