Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dev = [
"coverage[toml]>=5.0",
"pytest-cov>=2.7.1",
"coveralls",
"tinybird",
]

[tool.hatch.version]
Expand Down
Empty file added tests/__init__.py
Empty file.
Empty file added tests/integration/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import time

import pytest

from verdin.client import Client
from verdin.test.cli import TinybirdCli
from verdin.test.container import TinybirdLocalContainer


@pytest.fixture(scope="session")
def client(tinybird_local_container) -> Client:
return tinybird_local_container.client()


@pytest.fixture(scope="session")
def cli(tinybird_local_container) -> TinybirdCli:
project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "project"))

return TinybirdCli(
host=tinybird_local_container.url,
local=True,
cwd=project_dir,
)


@pytest.fixture(scope="session", autouse=True)
def tinybird_local_container():
"""
Starts a tinybird local container in the background and waits until it becomes available.
"""
project_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "project"))

container = TinybirdLocalContainer(cwd=project_dir)

container.start()
container.wait_is_up()

yield container

# cleanup
container.stop()


@pytest.fixture(scope="session", autouse=True)
def deployed_project(cli):
time.sleep(5)
cli.deploy(wait=True, auto=True)
yield
374 changes: 374 additions & 0 deletions tests/integration/project/.cursorrules

Large diffs are not rendered by default.

Empty file.
2 changes: 2 additions & 0 deletions tests/integration/project/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.tinyb
.terraform
373 changes: 373 additions & 0 deletions tests/integration/project/CLAUDE.md

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions tests/integration/project/datasources/simple.datasource
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DESCRIPTION >
Simple Key-Value Data Source

SCHEMA >
id UUID `json:$.Id`,
timestamp DateTime64(6) `json:$.Timestamp`,
key String `json:$.Key`,
value String `json:$.Value`

ENGINE "MergeTree"
14 changes: 14 additions & 0 deletions tests/integration/project/endpoints/simple_kv.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
VERSION 0

DESCRIPTION >
Endpoint to select unique key/value pairs from simple

NODE endpoint
SQL >
%
SELECT key, value
FROM simple
ORDER BY key, timestamp desc
LIMIT 1 by key

TYPE ENDPOINT
3 changes: 3 additions & 0 deletions tests/integration/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def test_client_has_token(client):
"""Makes sure the client fixture loaded the admin token correctly"""
assert client.token.startswith("p.e")
60 changes: 60 additions & 0 deletions tests/integration/test_datasource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging


LOG = logging.getLogger(__name__)


class TestDatasource:
def test_append_ndjson_query_truncate(self, client):
ds = client.datasource("simple")
ds.truncate()

ds.append_ndjson(
[
{
"Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
"Timestamp": "2024-01-23T10:30:00.123456",
"Key": "foo",
"Value": "bar",
},
{
"Id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
"Timestamp": "2024-02-23T11:45:00.234567",
"Key": "baz",
"Value": "ed",
},
]
)

query = client.sql("SELECT * FROM simple")
response = query.json()
assert response.data == [
{
"id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
"timestamp": "2024-01-23 10:30:00.123456",
"key": "foo",
"value": "bar",
},
{
"id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
"timestamp": "2024-02-23 11:45:00.234567",
"key": "baz",
"value": "ed",
},
]

query = client.sql("SELECT count(*) as cnt FROM simple")
response = query.json()
assert response.data == [{"cnt": 2}]

# remove all records from the table
ds.truncate()

# check that the table is empty
query = client.sql("SELECT count(*) as cnt FROM simple")
response = query.json()
assert response.data == [{"cnt": 0}]

query = client.sql("SELECT * FROM simple")
response = query.json()
assert response.data == []
35 changes: 35 additions & 0 deletions tests/integration/test_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
class TestPipe:
def test_pipe_query(self, client):
ds = client.datasource("simple")
ds.truncate()

ds.append_ndjson(
[
{
"Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0",
"Timestamp": "2024-01-23T10:30:00.123456",
"Key": "foo",
"Value": "bar",
},
{
"Id": "d7792957-21d8-46e6-a4e0-188eb36e2758",
"Timestamp": "2024-02-23T11:45:00.234567",
"Key": "baz",
"Value": "ed",
},
{
"Id": "fc71d4d5-7e0c-492a-9e3f-8f1cde9bcfaf",
"Timestamp": "2024-03-23T11:45:00.234567",
"Key": "foo",
"Value": "bar2",
},
]
)

pipe = client.pipe("simple_kv")

response = pipe.query()
assert response.data == [
{"key": "baz", "value": "ed"},
{"key": "foo", "value": "bar2"},
]
15 changes: 15 additions & 0 deletions verdin/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ def append_ndjson(self, records: List[Dict]) -> requests.Response:
)
return requests.post(url=self.api, params=query, headers=headers, data=data)

def truncate(self):
"""
Truncate the datasource which removes all records in the table.
"""
headers = {}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"

url = f"{self.api}/{self.canonical_name}/truncate"
LOG.debug(
"truncating table %s",
self.canonical_name,
)
requests.post(url=url, headers=headers)

@staticmethod
def to_csv(records: List[List[Any]], **kwargs):
return to_csv(records, **kwargs)
Expand Down
Empty file added verdin/test/__init__.py
Empty file.
153 changes: 153 additions & 0 deletions verdin/test/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Wrapper around the Tinybird CLI to make available the main commands programmatically."""

import dataclasses
import logging
import os
import re
import subprocess

LOG = logging.getLogger(__name__)


@dataclasses.dataclass
class Token:
id: str
name: str
token: str


class CliError(Exception):
def __init__(self, output: str, orig: subprocess.SubprocessError) -> None:
super().__init__(output)
self.orig = orig


class TinybirdCli:
"""Interface around the Tinybird CLI"""

def __init__(self, host: str = None, token: str = None, cwd: str = None, local: bool = False):
self.host = host
self.token = token
self.cwd = cwd
self.local = local

def _env(self) -> dict:
"""
Returns a dictionary of environment variables to be used when calling tb CLI commands.
"""
_env = dict(os.environ)

if self.host:
_env["TB_HOST"] = self.host
if self.token:
_env["TB_TOKEN"] = self.token

return _env

def _get_base_args(self) -> list[str]:
args = ["tb"]
if self.local:
args.append("--local")
return args

def token_ls(self) -> list[Token]:
"""
List all tokens.

:return: List of Token instances
"""
args = [*self._get_base_args(), "token", "ls"]

output = subprocess.check_output(
args,
encoding="utf-8",
cwd=self.cwd,
env=self._env(),
)
"""
output looks like this (unfortunately --output=json doesn't work)

** Tokens:
--------------------------------------------------------------------------------
id: 63678691-7e28-4f2d-8ef7-243ab19ad7cb
name: workspace admin token
token: p.eyJ1IjogIjU2ZThhYmMzLWRjNmYtNDcyYi05Yzg1LTdkZjFiZmUyNjU5YyIsICJpZCI6ICI2MzY3ODY5MS03ZTI4LTRmMmQtOGVmNy0yNDNhYjE5YWQ3Y2IiLCAiaG9zdCI6ICJsb2NhbCJ9.4gzsbiG1cnrIDUfHTxfQd0ZN57YkiOKEIyvuTlnLiaM
--------------------------------------------------------------------------------
id: 489c8ca1-195b-4383-a388-d84068ff1b2c
name: admin [email protected]
token: p.eyJ1IjogIjU2ZThhYmMzLWRjNmYtNDcyYi05Yzg1LTdkZjFiZmUyNjU5YyIsICJpZCI6ICI0ODljOGNhMS0xOTViLTQzODMtYTM4OC1kODQwNjhmZjFiMmMiLCAiaG9zdCI6ICJsb2NhbCJ9.MmcBjRTCg6dX53sWsZAv6QzHRHKxwu-pEWkqx8opLHA
--------------------------------------------------------------------------------
"""
tokens = []
current_token = {}

for line in output.splitlines():
# remove color codes
line = re.sub(r"\x1b\[[0-9;]*m", "", line)
line = line.strip()
if line.startswith("id: "):
current_token = {}
current_token["id"] = line[4:]
elif line.startswith("name: "):
current_token["name"] = line[6:]
elif line.startswith("token: "):
current_token["token"] = line[7:]
tokens.append(Token(**current_token))

return tokens

def local_start(
self, daemon: bool = False, skip_new_version: bool = False, volumes_path: str = None
) -> subprocess.Popen:
"""
Run ``tb local start`` and return the subprocess.
"""
args = ["tb", "local", "start"]
if daemon:
args.append("-d")
if skip_new_version:
args.append("--skip-new-version")
if volumes_path:
args.append("--volumes-path")
args.append(volumes_path)

return subprocess.Popen(args, cwd=self.cwd, env=self._env())

def local_stop(self):
"""
Run ``tb local stop``.
"""
subprocess.check_output(["tb", "local", "stop"])

def local_remove(self):
"""
Run ``tb local remove``.
"""
subprocess.check_output(
["tb", "local", "remove"],
input=b"y\n",
)

def deploy(
self, wait: bool = False, auto: bool = False, allow_destructive_operations: bool = False
):
args = [*self._get_base_args(), "deploy"]

if wait:
args.append("--wait")
if auto:
args.append("--auto")
if allow_destructive_operations:
args.append("--allow-destructive-operations")

try:
output = subprocess.check_output(
args,
encoding="utf-8",
cwd=self.cwd,
env=self._env(),
)
except subprocess.CalledProcessError as e:
raise CliError(f"Failed to deploy project:\n{e.output}", e) from e

return output
Loading