Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> {
return Ok(true);
}

assert!(new_ordering_values.len() == self.ordering_req.len());
debug_assert!(new_ordering_values.len() == self.ordering_req.len());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these changes change the safety of this code (as in these asserts aren't being used to ensure unsafe code)

let current_ordering = &self.orderings[group_idx];
compare_rows(current_ordering, new_ordering_values, &self.sort_options).map(|x| {
if self.pick_first_in_group {
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<S: ValueState> FirstLastGroupsAccumulator<S> {
self.state.update(group_idx, array, idx)?;
self.is_sets.set_bit(group_idx, true);

assert!(orderings.len() == self.ordering_req.len());
debug_assert!(orderings.len() == self.ordering_req.len());
let old_size = ScalarValue::size_of_vec(&self.orderings[group_idx]);
self.orderings[group_idx].clear();
self.orderings[group_idx].extend_from_slice(orderings);
Expand Down Expand Up @@ -650,7 +650,7 @@ impl<S: ValueState + 'static> GroupsAccumulator for FirstLastGroupsAccumulator<S
ordering_cols.push(Vec::with_capacity(self.orderings.len()));
}
for row in orderings.into_iter() {
assert_eq!(row.len(), self.ordering_req.len());
debug_assert!(row.len() == self.ordering_req.len());
for (col_idx, ordering) in row.into_iter().enumerate() {
ordering_cols[col_idx].push(ordering);
}
Expand Down Expand Up @@ -784,8 +784,7 @@ impl Accumulator for TrivialFirstValueAccumulator {
first_idx = Some(0);
}
if let Some(first_idx) = first_idx {
let mut row = get_row_at_idx(values, first_idx)?;
self.first = row.swap_remove(0);
self.first = ScalarValue::try_from_array(&values[0], first_idx)?;
self.first.compact();
self.is_set = true;
}
Expand Down Expand Up @@ -831,6 +830,8 @@ pub struct FirstValueAccumulator {
orderings: Vec<ScalarValue>,
// Stores the applicable ordering requirement.
ordering_req: LexOrdering,
// derived from `ordering_req`.
sort_options: Vec<SortOptions>,
Comment on lines +833 to +834
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these changes are for performance optimization, should we add some benchmarks for FirstValueAccumulator to quantify the improvement and prevent future regressions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced a bench only for FirstLastGroupsAccumulator since it has a non-trivial buffer improvement in take_state.

Extracting the construction of sort_options out of the hot loop can only improve performance. We don't bench mutable update_batch and merge_batch (I have a patch though), which go through sort_options usage. Anyway, I added benches for FirstValueAccumulator and LastValueAccumulator for our future benefit.

// Stores whether incoming data already satisfies the ordering requirement.
is_input_pre_ordered: bool,
// Ignore null values.
Expand All @@ -850,11 +851,13 @@ impl FirstValueAccumulator {
.iter()
.map(ScalarValue::try_from)
.collect::<Result<_>>()?;
let sort_options = get_sort_options(&ordering_req);
ScalarValue::try_from(data_type).map(|first| Self {
first,
is_set: false,
orderings,
ordering_req,
sort_options,
is_input_pre_ordered,
ignore_nulls,
})
Expand Down Expand Up @@ -927,12 +930,8 @@ impl Accumulator for FirstValueAccumulator {
let row = get_row_at_idx(values, first_idx)?;
if !self.is_set
|| (!self.is_input_pre_ordered
&& compare_rows(
&self.orderings,
&row[1..],
&get_sort_options(&self.ordering_req),
)?
.is_gt())
&& compare_rows(&self.orderings, &row[1..], &self.sort_options)?
.is_gt())
{
self.update_with_new_row(row);
}
Expand Down Expand Up @@ -960,10 +959,10 @@ impl Accumulator for FirstValueAccumulator {
let mut first_row = get_row_at_idx(&filtered_states, first_idx)?;
// When collecting orderings, we exclude the is_set flag from the state.
let first_ordering = &first_row[1..is_set_idx];
let sort_options = get_sort_options(&self.ordering_req);
// Either there is no existing value, or there is an earlier version in new data.
if !self.is_set
|| compare_rows(&self.orderings, first_ordering, &sort_options)?.is_gt()
|| compare_rows(&self.orderings, first_ordering, &self.sort_options)?
.is_gt()
{
// Update with first value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
Expand Down Expand Up @@ -1176,8 +1175,7 @@ impl Accumulator for TrivialLastValueAccumulator {
last_idx = Some(value.len() - 1);
}
if let Some(last_idx) = last_idx {
let mut row = get_row_at_idx(values, last_idx)?;
self.last = row.swap_remove(0);
self.last = ScalarValue::try_from_array(&values[0], last_idx)?;
self.last.compact();
self.is_set = true;
}
Expand Down Expand Up @@ -1221,6 +1219,8 @@ struct LastValueAccumulator {
orderings: Vec<ScalarValue>,
// Stores the applicable ordering requirement.
ordering_req: LexOrdering,
// derived from `ordering_req`.
sort_options: Vec<SortOptions>,
// Stores whether incoming data already satisfies the ordering requirement.
is_input_pre_ordered: bool,
// Ignore null values.
Expand All @@ -1240,11 +1240,13 @@ impl LastValueAccumulator {
.iter()
.map(ScalarValue::try_from)
.collect::<Result<_>>()?;
let sort_options = get_sort_options(&ordering_req);
ScalarValue::try_from(data_type).map(|last| Self {
last,
is_set: false,
orderings,
ordering_req,
sort_options,
is_input_pre_ordered,
ignore_nulls,
})
Expand Down Expand Up @@ -1317,12 +1319,7 @@ impl Accumulator for LastValueAccumulator {
// Update when there is a more recent entry
if !self.is_set
|| self.is_input_pre_ordered
|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_lt()
|| compare_rows(&self.orderings, orderings, &self.sort_options)?.is_lt()
{
self.update_with_new_row(row);
}
Expand Down Expand Up @@ -1350,12 +1347,12 @@ impl Accumulator for LastValueAccumulator {
let mut last_row = get_row_at_idx(&filtered_states, last_idx)?;
// When collecting orderings, we exclude the is_set flag from the state.
let last_ordering = &last_row[1..is_set_idx];
let sort_options = get_sort_options(&self.ordering_req);
// Either there is no existing value, or there is a newer (latest)
// version in the new data:
if !self.is_set
|| self.is_input_pre_ordered
|| compare_rows(&self.orderings, last_ordering, &sort_options)?.is_lt()
|| compare_rows(&self.orderings, last_ordering, &self.sort_options)?
.is_lt()
{
// Update with last value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
Expand Down
8 changes: 2 additions & 6 deletions datafusion/functions-aggregate/src/first_last/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,9 @@ pub(crate) fn take_need(
EmitTo::First(n) => {
// split off the first N values in seen_values
//
// TODO make this more efficient rather than two
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

// copies and bitwise manipulation
let first_n: BooleanBuffer = bool_buf.iter().take(n).collect();
let first_n: BooleanBuffer = bool_buf.slice(0, n);
// reset the existing buffer
for b in bool_buf.iter().skip(n) {
bool_buf_builder.append(b);
}
bool_buf_builder.append_buffer(&bool_buf.slice(n, bool_buf.len() - n));
first_n
}
}
Expand Down
Loading