Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet};
use spacetimedb_datastore::error::DatastoreError;
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo, ViewInstanceArgs};
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
pub use spacetimedb_durability::{DurabilityExited, DurableOffset};
use spacetimedb_engine::sql::rls::RowLevelExpr;
Expand All @@ -60,7 +60,7 @@ use spacetimedb_lib::http::{Request as HttpRequest, Response as HttpResponse};
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::{bsatn, ConnectionId, TimeDuration, Timestamp};
use spacetimedb_primitives::{ArgId, HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
use spacetimedb_primitives::{HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue};
Expand Down Expand Up @@ -1121,17 +1121,6 @@ pub(crate) fn resolve_view_for_refresh<'a>(
)
})?;

let is_anonymous = view_call.sender.is_none();

if st_view.is_anonymous != is_anonymous {
return Err(anyhow::anyhow!(
"found is_anonymous={} in st_view, but {} in readset when updating view `{}`",
st_view.is_anonymous,
is_anonymous,
view_name,
));
}

let is_anonymous = view_def.is_anonymous;

if st_view.is_anonymous != is_anonymous {
Expand Down Expand Up @@ -2867,11 +2856,10 @@ impl ModuleHost {
}

/// Materializes the views return by the `view_collector`, if not already materialized,
/// and updates `st_view_sub` accordingly.
/// and updates view lifecycle state accordingly.
///
/// Passing [`Workload::Sql`] will update `st_view_sub.last_called`.
/// Passing [`Workload::Subscribe`] will also increment `st_view_sub.num_subscribers`,
/// in addition to updating `st_view_sub.last_called`.
/// Passing [`Workload::Sql`] will update the instance's last-used timestamp.
/// Passing [`Workload::Subscribe`] will also increment the subscriber's refcount.
pub fn materialize_views<I: WasmInstance>(
mut tx: MutTxId,
instance: &mut RefInstance<'_, I>,
Expand All @@ -2888,12 +2876,14 @@ impl ModuleHost {
let view_id = st_view_row.view_id;
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
let is_anonymous = st_view_row.is_anonymous;
let sender = if is_anonymous { None } else { Some(caller) };
let is_materialized = if is_anonymous {
tx.is_anonymous_view_materialized(view_id)?
let args = if is_anonymous {
ViewInstanceArgs::Anonymous
} else {
tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)?
ViewInstanceArgs::Sender(caller)
};
let view_call = ViewCallInfo::from_args(view_id, args);
let sender = args.sender();
let is_materialized = tx.is_view_materialized(&view_call)?;
if !is_materialized {
let (res, trapped) =
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
Expand All @@ -2904,11 +2894,11 @@ impl ModuleHost {
}
// If this is a sql call, we only update this view's "last called" timestamp
if let Workload::Sql = workload {
tx.update_view_timestamp(view_id, ArgId::SENTINEL, caller)?;
tx.update_view_timestamp(view_call.clone(), args)?;
}
// If this is a subscribe call, we also increment this view's subscriber count
if let Workload::Subscribe = workload {
tx.subscribe_view(view_id, ArgId::SENTINEL, caller)?;
tx.subscribe_view(view_call, args, caller)?;
}
}
Ok((tx, false))
Expand Down Expand Up @@ -2944,14 +2934,23 @@ impl ModuleHost {
let mut abi_duration = Duration::ZERO;
let mut trapped = false;
for view_call in tx.views_for_refresh().cloned().collect::<Vec<_>>() {
let sender = view_call.sender;
let resolved = match resolve_view_for_refresh(&tx, module_def, &view_call) {
Ok(resolved) => resolved,
Err(err) => {
outcome = ViewOutcome::Failed(format!("failed to resolve view: {err}"));
break;
}
};
let sender = match tx.view_instance_args(&view_call) {
Some(args) => args.sender(),
None => {
outcome = ViewOutcome::Failed(format!(
"failed to look up materialized view args for view {}",
view_call.view_id
));
break;
}
};
let ResolvedViewForRefresh {
view_id,
table_id,
Expand Down
17 changes: 15 additions & 2 deletions crates/core/src/host/v8/syscall/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,22 @@ fn refresh_views(
let view_def = resolved.view_def;
let view_name = &view_def.name;
let fn_ptr = view_def.fn_ptr;
let sender = tx
.as_ref()
.expect("procedure tx missing while looking up refreshed view args")
.view_instance_args(&view_call)
.ok_or_else(|| {
TypeError(format!(
"failed to look up materialized view args for view {}",
view_call.view_id
))
.throw(scope)
})?
.sender();

let current_tx = tx.take().expect("procedure tx missing during view refresh");
let (next_tx, call_result) = tx_slot.set(current_tx, || {
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr)
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr, sender)
});
tx = Some(next_tx);
let return_data = call_result?;
Expand Down Expand Up @@ -851,14 +863,15 @@ fn call_view(
view_name: &Identifier,
table_id: TableId,
fn_ptr: ViewFnPtr,
sender: Option<Identity>,
) -> SysCallResult<ViewReturnData> {
let prev_func_type = get_env(scope)?
.instance_env
.swap_func_type(FuncCallType::View(view_call.clone()));

let result = {
let args = crate::host::ArgsTuple::nullary();
match view_call.sender {
match sender {
Some(sender) => call_call_view(
scope,
hooks,
Expand Down
41 changes: 22 additions & 19 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ impl InstanceCommon {
}
}

/// Re-evaluates all views which have entries in `st_view_subs`.
/// Re-evaluates all materialized view instances tracked in view lifecycle state.
fn evaluate_subscribed_views<I: WasmInstance>(
&mut self,
tx: MutTxId,
Expand Down Expand Up @@ -1367,7 +1367,10 @@ impl InstanceCommon {
(Ok(raw), sender) => {
// This is wrapped in a closure to simplify error handling.
let outcome: Result<ViewOutcome, anyhow::Error> = (|| {
let view_call = ViewCallInfo { view_id, sender };
let view_call = match sender {
Some(sender) => ViewCallInfo::sender(view_id, sender),
None => ViewCallInfo::anonymous(view_id),
};
let result = ViewResult::from_return_data(raw).context("Error parsing view result")?;
let typespace = self.info.module_def.typespace();
let row_product_type = typespace
Expand Down Expand Up @@ -1517,10 +1520,10 @@ fn collect_subscribed_view_calls(
let table_id = st_view
.table_id
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
let subs = tx.lookup_st_view_subs(view_id)?;
let view_instances = tx.materialized_view_instances_for_view(view_id);

if *is_anonymous {
if subs.is_empty() {
if view_instances.is_empty() {
continue;
}
view_calls.push(CallViewParams {
Expand All @@ -1537,14 +1540,17 @@ fn collect_subscribed_view_calls(
continue;
}

for sub in subs {
for args in view_instances {
let Some(sender) = args.sender() else {
continue;
};
view_calls.push(CallViewParams {
view_name: view_name.clone(),
view_id,
table_id,
fn_ptr: *fn_ptr,
caller: owner_identity,
sender: Some(sub.identity.into()),
sender: Some(sender),
args: ArgsTuple::nullary(),
row_type: *product_type_ref,
timestamp: Timestamp::now(),
Expand Down Expand Up @@ -1791,10 +1797,7 @@ impl InstanceOp for ViewOp<'_> {
}

fn call_type(&self) -> FuncCallType {
FuncCallType::View(ViewCallInfo {
view_id: self.view_id,
sender: Some(*self.sender),
})
FuncCallType::View(ViewCallInfo::sender(self.view_id, *self.sender))
}
}

Expand All @@ -1819,10 +1822,7 @@ impl InstanceOp for AnonymousViewOp<'_> {
}

fn call_type(&self) -> FuncCallType {
FuncCallType::View(ViewCallInfo {
view_id: self.view_id,
sender: None,
})
FuncCallType::View(ViewCallInfo::anonymous(self.view_id))
}
}

Expand Down Expand Up @@ -1920,9 +1920,9 @@ impl InstanceOp for HttpHandlerOp {
mod tests {
use super::collect_subscribed_view_calls;
use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB};
use spacetimedb_datastore::locking_tx_datastore::{ViewCallInfo, ViewInstanceArgs};
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder;
use spacetimedb_lib::{AlgebraicType, Identity, ProductType};
use spacetimedb_primitives::ArgId;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_schema::def::ModuleDef;

Expand Down Expand Up @@ -1960,8 +1960,9 @@ mod tests {

let mut tx = begin_mut_tx(&stdb);
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
let view_call = ViewCallInfo::anonymous(view_id);
tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, Identity::ZERO)?;
tx.subscribe_view(view_call, ViewInstanceArgs::Anonymous, Identity::ONE)?;

// Two subscriber rows exist, but anonymous views should still be reevaluated once
// because they share a single materialization.
Expand Down Expand Up @@ -1989,8 +1990,10 @@ mod tests {

let mut tx = begin_mut_tx(&stdb);
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
let zero_args = ViewInstanceArgs::Sender(Identity::ZERO);
let one_args = ViewInstanceArgs::Sender(Identity::ONE);
tx.subscribe_view(ViewCallInfo::from_args(view_id, zero_args), zero_args, Identity::ZERO)?;
tx.subscribe_view(ViewCallInfo::from_args(view_id, one_args), one_args, Identity::ONE)?;

// Sender-backed views keep one materialization per sender, so reevaluation must
// preserve both callers.
Expand Down
19 changes: 16 additions & 3 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1777,10 +1777,22 @@ impl WasmInstanceEnv {
let view_def = resolved.view_def;
let view_name = &view_def.name;
let fn_ptr = view_def.fn_ptr;
let sender = tx
.as_ref()
.expect("procedure tx missing while looking up refreshed view args")
.view_instance_args(&view_call)
.ok_or_else(|| {
anyhow!(
"failed to look up materialized view args for view {}",
view_call.view_id
)
})?
.sender();

let current_tx = tx.take().expect("procedure tx missing during view refresh");
let (next_tx, call_result) =
tx_slot.set(current_tx, || Self::call_view(caller, &view_call, view_name, fn_ptr));
let (next_tx, call_result) = tx_slot.set(current_tx, || {
Self::call_view(caller, &view_call, view_name, fn_ptr, sender)
});
tx = Some(next_tx);
let return_data = call_result?;

Expand Down Expand Up @@ -1836,6 +1848,7 @@ impl WasmInstanceEnv {
view_call: &ViewCallInfo,
view_name: &Identifier,
fn_ptr: ViewFnPtr,
sender: Option<Identity>,
) -> anyhow::Result<ViewReturnData> {
let prev_func_type = caller
.data_mut()
Expand Down Expand Up @@ -1863,7 +1876,7 @@ impl WasmInstanceEnv {
call_view_anon,
view_name,
fn_ptr.0,
view_call.sender,
sender,
args_source.0,
result_sink,
true,
Expand Down
15 changes: 10 additions & 5 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, HashSet}
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo};
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
use spacetimedb_durability::TxOffset;
use spacetimedb_execution::ExecutionParams;
Expand All @@ -42,7 +42,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::Identity;
use spacetimedb_lib::{bsatn, identity::AuthCtx};
use spacetimedb_physical_plan::plan::ProjectPlan;
use spacetimedb_primitives::ArgId;
use spacetimedb_schema::def::RawModuleDefVersion;
use spacetimedb_table::static_assert_size;
use std::{
Expand Down Expand Up @@ -1855,7 +1854,7 @@ impl ModuleSubscriptions {
/// and subsequently downgrades to a read-only transaction.
///
/// Unlike [`Self::materialize_views_and_downgrade_tx`] which populates the views' backing tables,
/// this method just decrements the subscriber count in `st_view_sub`.
/// this method just decrements the subscriber count in view lifecycle state.
/// Views without any subscribers are cleaned up async.
fn unsubscribe_views_and_downgrade_tx(
&self,
Expand All @@ -1869,7 +1868,7 @@ impl ModuleSubscriptions {
Ok(self.guard_tx(tx, opts))
}

/// We unsubscribe from views by decrementing the subscriber count in `st_view_sub`.
/// We unsubscribe from views by decrementing the subscriber count in the view lifecycle state.
/// Views without any subscribers are cleaned up async.
fn _unsubscribe_views(
tx: &mut MutTxId,
Expand All @@ -1879,7 +1878,13 @@ impl ModuleSubscriptions {
let mut view_ids = HashSet::new();
view_collector.collect_views(&mut view_ids);
for view_id in view_ids {
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
let is_anonymous = tx.lookup_st_view(view_id)?.is_anonymous;
let view_call = if is_anonymous {
ViewCallInfo::anonymous(view_id)
} else {
ViewCallInfo::sender(view_id, sender)
};
tx.unsubscribe_view(view_call, sender)?;
}
Ok(())
}
Expand Down
Loading
Loading