-
Notifications
You must be signed in to change notification settings - Fork 2k
perf: optimise first_value, last_value aggregate function
#21383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9171bf8
627887a
14929f0
f0554ce
1272caf
576abb0
27b02d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -426,7 +426,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> { | |
| return Ok(true); | ||
| } | ||
|
|
||
| assert!(new_ordering_values.len() == self.ordering_req.len()); | ||
| debug_assert!(new_ordering_values.len() == self.ordering_req.len()); | ||
| let current_ordering = &self.orderings[group_idx]; | ||
| compare_rows(current_ordering, new_ordering_values, &self.sort_options).map(|x| { | ||
| if self.pick_first_in_group { | ||
|
|
@@ -485,7 +485,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> { | |
| self.state.update(group_idx, array, idx)?; | ||
| self.is_sets.set_bit(group_idx, true); | ||
|
|
||
| assert!(orderings.len() == self.ordering_req.len()); | ||
| debug_assert!(orderings.len() == self.ordering_req.len()); | ||
| let old_size = ScalarValue::size_of_vec(&self.orderings[group_idx]); | ||
| self.orderings[group_idx].clear(); | ||
| self.orderings[group_idx].extend_from_slice(orderings); | ||
|
|
@@ -650,7 +650,7 @@ impl<S: ValueState + 'static> GroupsAccumulator for FirstLastGroupsAccumulator<S | |
| ordering_cols.push(Vec::with_capacity(self.orderings.len())); | ||
| } | ||
| for row in orderings.into_iter() { | ||
| assert_eq!(row.len(), self.ordering_req.len()); | ||
| debug_assert!(row.len() == self.ordering_req.len()); | ||
| for (col_idx, ordering) in row.into_iter().enumerate() { | ||
| ordering_cols[col_idx].push(ordering); | ||
| } | ||
|
|
@@ -784,8 +784,7 @@ impl Accumulator for TrivialFirstValueAccumulator { | |
| first_idx = Some(0); | ||
| } | ||
| if let Some(first_idx) = first_idx { | ||
| let mut row = get_row_at_idx(values, first_idx)?; | ||
| self.first = row.swap_remove(0); | ||
| self.first = ScalarValue::try_from_array(&values[0], first_idx)?; | ||
| self.first.compact(); | ||
| self.is_set = true; | ||
| } | ||
|
|
@@ -831,6 +830,8 @@ pub struct FirstValueAccumulator { | |
| orderings: Vec<ScalarValue>, | ||
| // Stores the applicable ordering requirement. | ||
| ordering_req: LexOrdering, | ||
| // derived from `ordering_req`. | ||
| sort_options: Vec<SortOptions>, | ||
|
Comment on lines
+833
to
+834
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since these changes are for performance optimization, should we add some benchmarks for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I introduced a bench only for Extracting the construction of |
||
| // Stores whether incoming data already satisfies the ordering requirement. | ||
| is_input_pre_ordered: bool, | ||
| // Ignore null values. | ||
|
|
@@ -850,11 +851,13 @@ impl FirstValueAccumulator { | |
| .iter() | ||
| .map(ScalarValue::try_from) | ||
| .collect::<Result<_>>()?; | ||
| let sort_options = get_sort_options(&ordering_req); | ||
| ScalarValue::try_from(data_type).map(|first| Self { | ||
| first, | ||
| is_set: false, | ||
| orderings, | ||
| ordering_req, | ||
| sort_options, | ||
| is_input_pre_ordered, | ||
| ignore_nulls, | ||
| }) | ||
|
|
@@ -927,12 +930,8 @@ impl Accumulator for FirstValueAccumulator { | |
| let row = get_row_at_idx(values, first_idx)?; | ||
| if !self.is_set | ||
| || (!self.is_input_pre_ordered | ||
| && compare_rows( | ||
| &self.orderings, | ||
| &row[1..], | ||
| &get_sort_options(&self.ordering_req), | ||
| )? | ||
| .is_gt()) | ||
| && compare_rows(&self.orderings, &row[1..], &self.sort_options)? | ||
| .is_gt()) | ||
| { | ||
| self.update_with_new_row(row); | ||
| } | ||
|
|
@@ -960,10 +959,10 @@ impl Accumulator for FirstValueAccumulator { | |
| let mut first_row = get_row_at_idx(&filtered_states, first_idx)?; | ||
| // When collecting orderings, we exclude the is_set flag from the state. | ||
| let first_ordering = &first_row[1..is_set_idx]; | ||
| let sort_options = get_sort_options(&self.ordering_req); | ||
| // Either there is no existing value, or there is an earlier version in new data. | ||
| if !self.is_set | ||
| || compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt() | ||
| || compare_rows(&self.orderings, first_ordering, &self.sort_options)? | ||
| .is_gt() | ||
| { | ||
| // Update with first value in the state. Note that we should exclude the | ||
| // is_set flag from the state. Otherwise, we will end up with a state | ||
|
|
@@ -1176,8 +1175,7 @@ impl Accumulator for TrivialLastValueAccumulator { | |
| last_idx = Some(value.len() - 1); | ||
| } | ||
| if let Some(last_idx) = last_idx { | ||
| let mut row = get_row_at_idx(values, last_idx)?; | ||
| self.last = row.swap_remove(0); | ||
| self.last = ScalarValue::try_from_array(&values[0], last_idx)?; | ||
| self.last.compact(); | ||
| self.is_set = true; | ||
| } | ||
|
|
@@ -1221,6 +1219,8 @@ struct LastValueAccumulator { | |
| orderings: Vec<ScalarValue>, | ||
| // Stores the applicable ordering requirement. | ||
| ordering_req: LexOrdering, | ||
| // derived from `ordering_req`. | ||
| sort_options: Vec<SortOptions>, | ||
| // Stores whether incoming data already satisfies the ordering requirement. | ||
| is_input_pre_ordered: bool, | ||
| // Ignore null values. | ||
|
|
@@ -1240,11 +1240,13 @@ impl LastValueAccumulator { | |
| .iter() | ||
| .map(ScalarValue::try_from) | ||
| .collect::<Result<_>>()?; | ||
| let sort_options = get_sort_options(&ordering_req); | ||
| ScalarValue::try_from(data_type).map(|last| Self { | ||
| last, | ||
| is_set: false, | ||
| orderings, | ||
| ordering_req, | ||
| sort_options, | ||
| is_input_pre_ordered, | ||
| ignore_nulls, | ||
| }) | ||
|
|
@@ -1317,12 +1319,7 @@ impl Accumulator for LastValueAccumulator { | |
| // Update when there is a more recent entry | ||
| if !self.is_set | ||
| || self.is_input_pre_ordered | ||
| || compare_rows( | ||
| &self.orderings, | ||
| orderings, | ||
| &get_sort_options(&self.ordering_req), | ||
| )? | ||
| .is_lt() | ||
| || compare_rows(&self.orderings, orderings, &self.sort_options)?.is_lt() | ||
| { | ||
| self.update_with_new_row(row); | ||
| } | ||
|
|
@@ -1350,12 +1347,12 @@ impl Accumulator for LastValueAccumulator { | |
| let mut last_row = get_row_at_idx(&filtered_states, last_idx)?; | ||
| // When collecting orderings, we exclude the is_set flag from the state. | ||
| let last_ordering = &last_row[1..is_set_idx]; | ||
| let sort_options = get_sort_options(&self.ordering_req); | ||
| // Either there is no existing value, or there is a newer (latest) | ||
| // version in the new data: | ||
| if !self.is_set | ||
| || self.is_input_pre_ordered | ||
| || compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt() | ||
| || compare_rows(&self.orderings, last_ordering, &self.sort_options)? | ||
| .is_lt() | ||
| { | ||
| // Update with last value in the state. Note that we should exclude the | ||
| // is_set flag from the state. Otherwise, we will end up with a state | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -300,13 +300,9 @@ pub(crate) fn take_need( | |
| EmitTo::First(n) => { | ||
| // split off the first N values in seen_values | ||
| // | ||
| // TODO make this more efficient rather than two | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
| // copies and bitwise manipulation | ||
| let first_n: BooleanBuffer = bool_buf.iter().take(n).collect(); | ||
| let first_n: BooleanBuffer = bool_buf.slice(0, n); | ||
| // reset the existing buffer | ||
| for b in bool_buf.iter().skip(n) { | ||
| bool_buf_builder.append(b); | ||
| } | ||
| bool_buf_builder.append_buffer(&bool_buf.slice(n, bool_buf.len() - n)); | ||
| first_n | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these changes change the safety of this code (as in these asserts aren't being used to ensure
unsafecode)