Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ regex_expressions = ["regex"]
# enable string functions
string_expressions = ["uuid"]
# enable unicode functions
unicode_expressions = ["unicode-segmentation"]
unicode_expressions = []

[lib]
name = "datafusion_functions"
Expand Down Expand Up @@ -87,7 +87,6 @@ num-traits = { workspace = true }
rand = { workspace = true }
regex = { workspace = true, optional = true }
sha2 = { workspace = true, optional = true }
unicode-segmentation = { version = "^1.13.2", optional = true }
uuid = { workspace = true, features = ["v4"], optional = true }

[dev-dependencies]
Expand Down
41 changes: 36 additions & 5 deletions datafusion/functions/src/unicode/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,39 @@ impl LeftRightSlicer for RightSlicer {
}
}

/// Returns the byte offset of the `n`th codepoint in `string`,
/// or `string.len()` if the string has fewer than `n` codepoints.
#[inline]
pub(crate) fn byte_offset_of_char(string: &str, n: usize) -> usize {
string
.char_indices()
.nth(n)
.map_or(string.len(), |(i, _)| i)
}

/// If `string` has more than `n` codepoints, returns the byte offset of
/// the `n`-th codepoint boundary. Otherwise returns the total codepoint count.
#[inline]
pub(crate) fn char_count_or_boundary(string: &str, n: usize) -> StringCharLen {
let mut count = 0;
for (byte_idx, _) in string.char_indices() {
if count == n {
return StringCharLen::ByteOffset(byte_idx);
}
count += 1;
}
StringCharLen::CharCount(count)
}

/// Result of [`char_count_or_boundary`].
pub(crate) enum StringCharLen {
/// The string has more than `n` codepoints; contains the byte offset
/// at the `n`-th codepoint boundary.
ByteOffset(usize),
/// The string has `n` or fewer codepoints; contains the exact count.
CharCount(usize),
}

/// Calculate the byte length of the substring of `n` chars from string `string`
#[inline]
fn left_right_byte_length(string: &str, n: i64) -> usize {
Expand All @@ -88,11 +121,9 @@ fn left_right_byte_length(string: &str, n: i64) -> usize {
.map(|(index, _)| index)
.unwrap_or(0),
Ordering::Equal => 0,
Ordering::Greater => string
.char_indices()
.nth(n.unsigned_abs().min(usize::MAX as u64) as usize)
.map(|(index, _)| index)
.unwrap_or(string.len()),
Ordering::Greater => {
byte_offset_of_char(string, n.unsigned_abs().min(usize::MAX as u64) as usize)
}
}
}

Expand Down
88 changes: 38 additions & 50 deletions datafusion/functions/src/unicode/lpad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow::array::{
OffsetSizeTrait, StringArrayType, StringViewArray,
};
use arrow::datatypes::DataType;
use unicode_segmentation::UnicodeSegmentation;

use crate::utils::{make_scalar_function, utf8_to_str_type};
use datafusion_common::cast::as_int64_array;
Expand Down Expand Up @@ -178,7 +177,9 @@ impl ScalarUDFImpl for LPadFunc {
}
}

use super::common::{try_as_scalar_i64, try_as_scalar_str};
use super::common::{
StringCharLen, char_count_or_boundary, try_as_scalar_i64, try_as_scalar_str,
};

/// Optimized lpad for constant target_len and fill arguments.
fn lpad_scalar_args<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
Expand Down Expand Up @@ -270,27 +271,22 @@ fn lpad_scalar_unicode<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
let data_capacity = string_array.len().saturating_mul(target_len * 4);
let mut builder =
GenericStringBuilder::<T>::with_capacity(string_array.len(), data_capacity);
let mut graphemes_buf = Vec::new();

for maybe_string in string_array.iter() {
match maybe_string {
Some(string) => {
graphemes_buf.clear();
graphemes_buf.extend(string.graphemes(true));

if target_len < graphemes_buf.len() {
let end: usize =
graphemes_buf[..target_len].iter().map(|g| g.len()).sum();
builder.append_value(&string[..end]);
} else if fill_chars.is_empty() {
builder.append_value(string);
} else {
let pad_chars = target_len - graphemes_buf.len();
let pad_bytes = char_byte_offsets[pad_chars];
builder.write_str(&padding_buf[..pad_bytes])?;
Some(string) => match char_count_or_boundary(string, target_len) {
StringCharLen::ByteOffset(offset) => {
builder.append_value(&string[..offset]);
}
StringCharLen::CharCount(char_count) => {
if !fill_chars.is_empty() {
let pad_chars = target_len - char_count;
let pad_bytes = char_byte_offsets[pad_chars];
builder.write_str(&padding_buf[..pad_bytes])?;
}
builder.append_value(string);
}
}
},
None => builder.append_null(),
}
}
Expand Down Expand Up @@ -378,7 +374,6 @@ where
{
let array = if let Some(fill_array) = fill_array {
let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new();
let mut graphemes_buf = Vec::new();
let mut fill_chars_buf = Vec::new();

for ((string, target_len), fill) in string_array
Expand Down Expand Up @@ -407,8 +402,7 @@ where
}

if string.is_ascii() && fill.is_ascii() {
// ASCII fast path: byte length == character length,
// so we skip expensive grapheme segmentation.
// ASCII fast path: byte length == character length.
let str_len = string.len();
if target_len < str_len {
builder.append_value(&string[..target_len]);
Expand All @@ -428,26 +422,24 @@ where
builder.append_value(string);
}
} else {
// Reuse buffers by clearing and refilling
graphemes_buf.clear();
graphemes_buf.extend(string.graphemes(true));

fill_chars_buf.clear();
fill_chars_buf.extend(fill.chars());

if target_len < graphemes_buf.len() {
let end: usize =
graphemes_buf[..target_len].iter().map(|g| g.len()).sum();
builder.append_value(&string[..end]);
} else if fill_chars_buf.is_empty() {
builder.append_value(string);
} else {
for l in 0..target_len - graphemes_buf.len() {
let c =
*fill_chars_buf.get(l % fill_chars_buf.len()).unwrap();
builder.write_char(c)?;
match char_count_or_boundary(string, target_len) {
StringCharLen::ByteOffset(offset) => {
builder.append_value(&string[..offset]);
}
StringCharLen::CharCount(char_count) => {
if !fill_chars_buf.is_empty() {
for l in 0..target_len - char_count {
let c = *fill_chars_buf
.get(l % fill_chars_buf.len())
.unwrap();
builder.write_char(c)?;
}
}
builder.append_value(string);
}
builder.append_value(string);
}
}
} else {
Expand All @@ -458,7 +450,6 @@ where
builder.finish()
} else {
let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new();
let mut graphemes_buf = Vec::new();

for (string, target_len) in string_array.iter().zip(length_array.iter()) {
if let (Some(string), Some(target_len)) = (string, target_len) {
Expand Down Expand Up @@ -491,19 +482,16 @@ where
builder.append_value(string);
}
} else {
// Reuse buffer by clearing and refilling
graphemes_buf.clear();
graphemes_buf.extend(string.graphemes(true));

if target_len < graphemes_buf.len() {
let end: usize =
graphemes_buf[..target_len].iter().map(|g| g.len()).sum();
builder.append_value(&string[..end]);
} else {
for _ in 0..(target_len - graphemes_buf.len()) {
builder.write_str(" ")?;
match char_count_or_boundary(string, target_len) {
StringCharLen::ByteOffset(offset) => {
builder.append_value(&string[..offset]);
}
StringCharLen::CharCount(char_count) => {
for _ in 0..(target_len - char_count) {
builder.write_str(" ")?;
}
builder.append_value(string);
}
builder.append_value(string);
}
}
} else {
Expand Down
90 changes: 40 additions & 50 deletions datafusion/functions/src/unicode/rpad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow::array::{
OffsetSizeTrait, StringArrayType, StringViewArray,
};
use arrow::datatypes::DataType;
use unicode_segmentation::UnicodeSegmentation;

use crate::utils::{make_scalar_function, utf8_to_str_type};
use datafusion_common::cast::as_int64_array;
Expand Down Expand Up @@ -178,7 +177,9 @@ impl ScalarUDFImpl for RPadFunc {
}
}

use super::common::{try_as_scalar_i64, try_as_scalar_str};
use super::common::{
StringCharLen, char_count_or_boundary, try_as_scalar_i64, try_as_scalar_str,
};

/// Optimized rpad for constant target_len and fill arguments.
fn rpad_scalar_args<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
Expand Down Expand Up @@ -271,28 +272,23 @@ fn rpad_scalar_unicode<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>(
let data_capacity = string_array.len().saturating_mul(target_len * 4);
let mut builder =
GenericStringBuilder::<T>::with_capacity(string_array.len(), data_capacity);
let mut graphemes_buf = Vec::new();

for maybe_string in string_array.iter() {
match maybe_string {
Some(string) => {
graphemes_buf.clear();
graphemes_buf.extend(string.graphemes(true));

if target_len < graphemes_buf.len() {
let end: usize =
graphemes_buf[..target_len].iter().map(|g| g.len()).sum();
builder.append_value(&string[..end]);
} else if fill_chars.is_empty() {
builder.append_value(string);
} else {
let pad_chars = target_len - graphemes_buf.len();
let pad_bytes = char_byte_offsets[pad_chars];
Some(string) => match char_count_or_boundary(string, target_len) {
StringCharLen::ByteOffset(offset) => {
builder.append_value(&string[..offset]);
}
StringCharLen::CharCount(char_count) => {
builder.write_str(string)?;
builder.write_str(&padding_buf[..pad_bytes])?;
if !fill_chars.is_empty() {
let pad_chars = target_len - char_count;
let pad_bytes = char_byte_offsets[pad_chars];
builder.write_str(&padding_buf[..pad_bytes])?;
}
builder.append_value("");
}
}
},
None => builder.append_null(),
}
}
Expand Down Expand Up @@ -377,7 +373,6 @@ where
{
let array = if let Some(fill_array) = fill_array {
let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new();
let mut graphemes_buf = Vec::new();
let mut fill_chars_buf = Vec::new();

for ((string, target_len), fill) in string_array
Expand Down Expand Up @@ -406,8 +401,7 @@ where
}

if string.is_ascii() && fill.is_ascii() {
// ASCII fast path: byte length == character length,
// so we skip expensive grapheme segmentation.
// ASCII fast path: byte length == character length.
let str_len = string.len();
if target_len < str_len {
builder.append_value(&string[..target_len]);
Expand All @@ -428,26 +422,25 @@ where
builder.append_value("");
}
} else {
graphemes_buf.clear();
graphemes_buf.extend(string.graphemes(true));

fill_chars_buf.clear();
fill_chars_buf.extend(fill.chars());

if target_len < graphemes_buf.len() {
let end: usize =
graphemes_buf[..target_len].iter().map(|g| g.len()).sum();
builder.append_value(&string[..end]);
} else if fill_chars_buf.is_empty() {
builder.append_value(string);
} else {
builder.write_str(string)?;
for l in 0..target_len - graphemes_buf.len() {
let c =
*fill_chars_buf.get(l % fill_chars_buf.len()).unwrap();
builder.write_char(c)?;
match char_count_or_boundary(string, target_len) {
StringCharLen::ByteOffset(offset) => {
builder.append_value(&string[..offset]);
}
StringCharLen::CharCount(char_count) => {
builder.write_str(string)?;
if !fill_chars_buf.is_empty() {
for l in 0..target_len - char_count {
let c = *fill_chars_buf
.get(l % fill_chars_buf.len())
.unwrap();
builder.write_char(c)?;
}
}
builder.append_value("");
}
builder.append_value("");
}
}
} else {
Expand All @@ -458,7 +451,6 @@ where
builder.finish()
} else {
let mut builder: GenericStringBuilder<T> = GenericStringBuilder::new();
let mut graphemes_buf = Vec::new();

for (string, target_len) in string_array.iter().zip(length_array.iter()) {
if let (Some(string), Some(target_len)) = (string, target_len) {
Expand Down Expand Up @@ -492,19 +484,17 @@ where
builder.append_value("");
}
} else {
graphemes_buf.clear();
graphemes_buf.extend(string.graphemes(true));

if target_len < graphemes_buf.len() {
let end: usize =
graphemes_buf[..target_len].iter().map(|g| g.len()).sum();
builder.append_value(&string[..end]);
} else {
builder.write_str(string)?;
for _ in 0..(target_len - graphemes_buf.len()) {
builder.write_str(" ")?;
match char_count_or_boundary(string, target_len) {
StringCharLen::ByteOffset(offset) => {
builder.append_value(&string[..offset]);
}
StringCharLen::CharCount(char_count) => {
builder.write_str(string)?;
for _ in 0..(target_len - char_count) {
builder.write_str(" ")?;
}
builder.append_value("");
}
builder.append_value("");
}
}
} else {
Expand Down
Loading
Loading