diff --git a/README.md b/README.md index cd394fe..8699720 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ The following APIs are available: * `/v0/datasources`: `client.api.datasources` * `/v0/events`: `client.api.events` * `/v0/pipes`: `client.api.pipes` +* `/v0/sql`: `client.api.query` * `/v0/tokens`: `client.api.tokens` * `/v0/variables`: `client.api.variables` diff --git a/tests/integration/test_query.py b/tests/integration/test_query.py new file mode 100644 index 0000000..97513c1 --- /dev/null +++ b/tests/integration/test_query.py @@ -0,0 +1,120 @@ +import pytest + + +class TestQueryApi: + @pytest.fixture(autouse=True) + def _put_records(self, client): + client.api.events.send( + "simple", + wait=True, + records=[ + { + "Id": "e7f2af3e-99d1-4d4f-8a8c-d6aee4ab89b0", + "Timestamp": "2024-01-23T10:30:00.123456", + "Key": "foo", + "Value": "bar", + }, + { + "Id": "d7792957-21d8-46e6-a4e0-188eb36e2758", + "Timestamp": "2024-02-23T11:45:00.234567", + "Key": "baz", + "Value": "ed", + }, + ], + ) + + def test_query_datasource_json(self, client): + response = client.api.query.query("SELECT key, value FROM simple ORDER BY `key` ASC") + + assert response.data == [{"key": "baz", "value": "ed"}, {"key": "foo", "value": "bar"}] + assert response.meta == [ + {"name": "key", "type": "String"}, + {"name": "value", "type": "String"}, + ] + assert response.rows == 2 + assert response.statistics["rows_read"] == 2 + + def test_query_pipe(self, client): + response = client.api.query.query("SELECT * FROM simple_kv ORDER BY `key` ASC") + + assert response.data == [{"key": "baz", "value": "ed"}, {"key": "foo", "value": "bar"}] + assert response.meta == [ + {"name": "key", "type": "String"}, + {"name": "value", "type": "String"}, + ] + assert response.rows == 2 + assert response.statistics["rows_read"] == 2 + + def test_query_pipe_parameters(self, client): + response = client.api.query.query( + "SELECT key, value FROM simple_pipe", parameters={"key": "foo"} + ) + + assert response.data == [{"key": "foo", "value": "bar"}] + assert response.meta == [ + {"name": "key", "type": "String"}, + {"name": "value", "type": "String"}, + ] + assert response.rows == 1 + assert response.statistics["rows_read"] == 2 + + def test_query_pipeline_json(self, client): + response = client.api.query.query( + "SELECT * FROM _ ORDER BY `key` ASC", pipeline="simple_kv" + ) + + assert response.data == [{"key": "baz", "value": "ed"}, {"key": "foo", "value": "bar"}] + assert response.meta == [ + {"name": "key", "type": "String"}, + {"name": "value", "type": "String"}, + ] + assert response.rows == 2 + assert response.statistics["rows_read"] == 2 + + def test_query_csv(self, client): + response = client.api.query.query( + "SELECT key, value FROM simple ORDER BY `key` ASC", format="CSV" + ) + + assert response.text == '"baz","ed"\n"foo","bar"\n' + + def test_query_csv_with_names(self, client): + response = client.api.query.query( + "SELECT key, value FROM simple ORDER BY `key` ASC", format="CSVWithNames" + ) + + assert ( + response.text + == '"key","value"\n"baz","ed"\n"foo","bar"\n' + != '"baz","ed"\n"foo","bar"\n' + ) + # CSV with names can be parsed as data! + assert response.data == [{"key": "baz", "value": "ed"}, {"key": "foo", "value": "bar"}] + + def test_query_tsv(self, client): + response = client.api.query.query( + "SELECT key, value FROM simple ORDER BY `key` ASC", format="TSV" + ) + + assert response.text == "baz\ted\nfoo\tbar\n" + + def test_query_tsv_with_names(self, client): + response = client.api.query.query( + "SELECT key, value FROM simple ORDER BY `key` ASC", format="TSVWithNames" + ) + + assert response.text == "key\tvalue\nbaz\ted\nfoo\tbar\n" + assert response.data == [{"key": "baz", "value": "ed"}, {"key": "foo", "value": "bar"}] + + def test_query_ndjson(self, client): + response = client.api.query.query( + "SELECT key, value FROM simple ORDER BY `key` ASC", format="JSONEachRow" + ) + + assert ( + response.text + == '{"key":"baz","value":"ed"}\n{"key":"foo","value":"bar"}\n' + != '"key","value"\n"baz","ed"\n"foo","bar"\n' + ) + # CSV with names can be parsed as data! + assert response.data == [{"key": "baz", "value": "ed"}, {"key": "foo", "value": "bar"}] diff --git a/verdin/__init__.py b/verdin/__init__.py index 9de3aac..14ea946 100644 --- a/verdin/__init__.py +++ b/verdin/__init__.py @@ -1,3 +1,3 @@ name = "verdin" -__version__ = "0.5.0" +__version__ = "0.5.1" diff --git a/verdin/api/apis.py b/verdin/api/apis.py index c064bb5..1f2cea7 100644 --- a/verdin/api/apis.py +++ b/verdin/api/apis.py @@ -1,6 +1,7 @@ from .datasources import DataSourcesApi from .events import EventsApi from .pipes import PipesApi +from .query import QueryApi from .tokens import TokensApi from .variables import VariablesApi @@ -29,6 +30,10 @@ def events(self) -> EventsApi: def pipes(self) -> PipesApi: return PipesApi(self._token, self._host) + @property + def query(self) -> QueryApi: + return QueryApi(self._token, self._host) + @property def tokens(self) -> TokensApi: return TokensApi(self._token, self._host) diff --git a/verdin/api/query.py b/verdin/api/query.py new file mode 100644 index 0000000..fc63153 --- /dev/null +++ b/verdin/api/query.py @@ -0,0 +1,261 @@ +import csv +import json +from typing import Literal, Optional, TypedDict, Any + +import requests + +from verdin.api import ApiResponse +from verdin.api.base import Api, ApiError + +QueryOutputFormat = Literal[ + "CSV", + "CSVWithNames", + "JSON", + "TSV", + "TSVWithNames", + "PrettyCompact", + "JSONEachRow", + "Parquet", + "Prometheus", +] +"""See https://www.tinybird.co/docs/api-reference/query-api#id10 + ++---------------+--------------------------------------------------+ +| Format | Description | ++===============|==================================================+ +| CSV | CSV without header | ++---------------+--------------------------------------------------+ +| CSVWithNames | CSV with header | ++---------------+--------------------------------------------------+ +| JSON | JSON including data, statistics and schema info | ++---------------+--------------------------------------------------+ +| TSV | TSV without header | ++---------------+--------------------------------------------------+ +| TSVWithNames | TSV with header | ++---------------+--------------------------------------------------+ +| PrettyCompact | Formatted table | ++---------------+--------------------------------------------------+ +| JSONEachRow | Newline-delimited JSON values (NDJSON) | ++---------------+--------------------------------------------------+ +| Parquet | Apache Parquet | ++---------------+--------------------------------------------------+ +| Prometheus | Prometheus text-based format | ++---------------+--------------------------------------------------+ +""" + + +class QueryStatistics(TypedDict): + bytes_read: int + elapsed: float + rows_read: int + + +class QueryMetadata(TypedDict): + name: str + type: str + + +QueryData = list[dict[str, Any]] + + +class QueryResponse(ApiResponse): + @property + def data(self) -> QueryData: + raise NotImplementedError + + +class QueryJsonResponse(QueryResponse): + @property + def data(self) -> QueryData: + """ + Returns the data returned by the query, which is a list of dictionaries representing the records in rows. + + :return: List of records. + """ + return self.json.get("data", []) + + @property + def meta(self) -> list[QueryMetadata]: + """ + Returns the QueryMetadata from the query, which includes attributes and their types. + + :return: The QueryMetadata + """ + return self.json.get("meta", []) + + @property + def rows(self) -> int: + """ + Returns the number of rows returned by the query. + + :return: The number of rows returned by the query. + """ + return self.json.get("rows") + + @property + def statistics(self) -> QueryStatistics: + """ + Returns the query statistics, which include the number of bytes read, the number of rows read, and the elapsed. + :return: The QueryStatistics objects. + """ + return self.json.get("statistics", {}) + + @property + def empty(self) -> bool: + """ + A property to check if the data in the result is empty. + + This property evaluates whether the "data" field within the "result" + attribute is empty. + + :return: Returns True if the "data" field in "result" is missing or empty, + otherwise returns False. + """ + return not self.json.get("data") + + +class QueryNdjsonResponse(QueryResponse): + @property + def data(self) -> list[dict]: + """Parses the CSV response body into a list of dictionaries.""" + for line in self.text.splitlines(): + json.loads(line) + return [json.loads(line) for line in self.text.strip().splitlines()] + + +class QueryCsvResponse(QueryResponse): + def __init__(self, response: requests.Response, delimiter: str = ","): + super().__init__(response) + self.delimiter = delimiter + + @property + def data(self) -> list[dict]: + """Parses the CSV response body into a list of dictionaries.""" + reader = csv.DictReader( + self.text.splitlines(), + delimiter=self.delimiter, + ) + return list(reader) + + +class QueryApi(Api): + """ + The Query API allows you to query your Pipes and Data Sources inside Tinybird as if you were running SQL statements + against a standard database. + + See https://www.tinybird.co/docs/api-reference/query-api. + """ + + endpoint: str = "/v0/sql" + + session: requests.Session + + def __init__(self, token: str, host: str = None): + super().__init__(token, host) + + self.session = requests.Session() + if self.token: + self.session.headers.update({"Authorization": f"Bearer {self.token}"}) + + def query( + self, + query: str, + pipeline: str = None, + parameters: dict[str, str] = None, + output_format_json_quote_64bit_integers: bool = False, + output_format_json_quote_denormals: bool = False, + output_format_parquet_string_as_string: bool = False, + format: QueryOutputFormat = "JSON", + ) -> QueryResponse | QueryJsonResponse | QueryNdjsonResponse | QueryCsvResponse: + """ + Executes a SQL query using the engine. As a response, it gives you the query metadata, the resulting data and + some performance statistics. + + The return type will depend on the desired ``format``. For the following formats, we return special response + objects that contain the parsed data: + * ``JSON``: ``QueryJsonResponse`` (default) + * ``CSVWithNames``: QueryCsvResponse + * ``TSVWithNames``: QueryCsvResponse + * ``JSONEachRow``: ``QueryNdjsonResponse`` + + For all other formats, we return a generic ``QueryResponse`` object, that allows you to access the raw response + body via ``response.text`` (str) or ``response.content`` (bytes). + + :param query: The SQL query + :param pipeline: (Optional) The name of the pipe. It allows writing a query like 'SELECT * FROM _' where '_' is + a placeholder for the 'pipeline' parameter + :param parameters: Additional query parameters + :param output_format_json_quote_64bit_integers: (Optional) Controls quoting of 64-bit or bigger integers (like + UInt64 or Int128) when they are output in a JSON format. Such integers are enclosed in quotes by default. + This behavior is compatible with most JavaScript implementations. Possible values: False — Integers are + output without quotes. True — Integers are enclosed in quotes. Default value is False + :param output_format_json_quote_denormals: (Optional) Controls representation of inf and nan on the UI instead + of null e.g when dividing by 0 - inf and when there is no representation of a number in Javascript - nan. + Default value is false + :param output_format_parquet_string_as_string: (Optional) Use Parquet String type instead of Binary for String + columns. Possible values: False - disabled, True - enabled. Default value is False + :param format: Output format of the query results (defaults to JSON) + :return: QueryResponse object containing the query results + """ + + query = _sql_with_format(query, format) + + data: dict[str, str | int] = dict(parameters) if parameters else {} + if query: + data["q"] = query + if pipeline: + data["pipeline"] = pipeline + if output_format_json_quote_64bit_integers: + data["output_format_json_quote_64bit_integers"] = 1 + if output_format_json_quote_denormals: + data["output_format_json_quote_denormals"] = 1 + if output_format_parquet_string_as_string: + data["output_format_parquet_string_as_string"] = 1 + + # if the query is too large, the web server (nginx) will respond with "414 Request-URI Too Large". it seems + # this limit is around 8kb, so if it's too large, we use a POST request instead. + qsize = 1 # include the "?" character + for k, v in data.items(): + qsize += len(k) + len(v) + 2 # include the ``&`` and ``=`` character + + if qsize > 8192 or parameters: + response = self.session.request( + method="POST", + url=f"{self.host}{self.endpoint}", + data=data, + ) + else: + response = self.session.request( + method="GET", + url=f"{self.host}{self.endpoint}", + params=data, + ) + + if not response.ok: + raise ApiError(response) + + # format-specific response objects + if format == "JSON": + return QueryJsonResponse(response) + if format == "CSVWithNames": + return QueryCsvResponse(response) + if format == "TSVWithNames": + return QueryCsvResponse(response, delimiter="\t") + if format == "JSONEachRow": + return QueryNdjsonResponse(response) + + return QueryResponse(response) + + +def _sql_with_format(sql, output_format: Optional[QueryOutputFormat] = None) -> str: + """ + Returns a formatted SQL query with the given output format. If no output format is specified, the query is + returned as is. + + :param output_format: The output format to use (suffixes ``FORMAT `` to the query) + :return: An SQL string + """ + # TODO: handle potentially already existing FORMAT string + if not output_format: + return sql + return sql + f" FORMAT {output_format}" diff --git a/verdin/query.py b/verdin/query.py index 4d1f5c5..7651cbc 100644 --- a/verdin/query.py +++ b/verdin/query.py @@ -5,6 +5,8 @@ import requests from . import config +from .api import ApiError +from .api.query import QueryApi LOG = logging.getLogger(__name__) @@ -111,7 +113,9 @@ def __init__( self.sql = sql self.format = format or OutputFormat.JSON self.token = token - self.api = (api or config.API_URL).rstrip("/") + self.endpoint + host = (api or config.API_URL).rstrip("/") + self.api = host + self.endpoint + self._query_api = QueryApi(token=token, host=host) def get(self, format: Optional[OutputFormat] = None) -> requests.Response: """ @@ -122,23 +126,21 @@ def get(self, format: Optional[OutputFormat] = None) -> requests.Response: :param format: Overwrite the default output format set in the constructor. :return: the HTTP response """ - query = {"q": self._sql_with_format(format or self.format)} - - headers = {"Content-Type": "text/html; charset=utf-8"} - if self.token: - headers["Authorization"] = f"Bearer {self.token}" LOG.debug( "querying %s with query: %s", self.api, - query, + self.sql, ) - response = requests.get(url=self.api, params=query, headers=headers) - - if not response.ok: - raise QueryError(response) - return response + try: + response = self._query_api.query( + self.sql, + format=(format or self.format).value, + ) + return response._response + except ApiError as e: + raise QueryError(response=e._response) from e def json(self) -> QueryJsonResult: """ @@ -149,16 +151,3 @@ def json(self) -> QueryJsonResult: response = self.get(OutputFormat.JSON) return QueryJsonResult(response) - - def _sql_with_format(self, output_format: Optional[OutputFormat] = None) -> str: - """ - Returns a formatted SQL query with the given output format. If no output format is specified, the query is - returned as is. - - :param output_format: The output format to use (suffixes ``FORMAT `` to the query) - :return: An SQL string - """ - # TODO: handle potentially already existing FORMAT string - if not output_format: - return self.sql - return self.sql + f" FORMAT {output_format.value}"