Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "parquet/bloom_filter.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
#include "parquet/thrift_internal.h"
#include "parquet/xxhasher.h"
Expand Down Expand Up @@ -345,6 +346,48 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}

void BlockSplitBloomFilter::WriteEncrypted(ArrowOutputStream* sink, Encryptor* encryptor,
int16_t row_group_ordinal,
int16_t column_ordinal) const {
DCHECK(sink != nullptr);
if (encryptor == nullptr) {
throw ParquetException("Bloom filter encryptor must be provided");
}

format::BloomFilterHeader header;
if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) {
throw ParquetException("BloomFilter does not support Algorithm other than BLOCK");
}
header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm());
if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) {
throw ParquetException("BloomFilter does not support Hash other than XXHASH");
}
header.hash.__set_XXHASH(format::XxHash());
if (ARROW_PREDICT_FALSE(compression_strategy_ != CompressionStrategy::UNCOMPRESSED)) {
throw ParquetException(
"BloomFilter does not support Compression other than UNCOMPRESSED");
}
header.compression.__set_UNCOMPRESSED(format::Uncompressed());
header.__set_numBytes(num_bytes_);

// Bloom filter header and bitset are separate encrypted modules with different AADs.
encryptor->UpdateAad(
encryption::CreateModuleAad(encryptor->file_aad(), encryption::kBloomFilterHeader,
row_group_ordinal, column_ordinal, -1));
ThriftSerializer serializer;
serializer.Serialize(&header, sink, encryptor);

encryptor->UpdateAad(
encryption::CreateModuleAad(encryptor->file_aad(), encryption::kBloomFilterBitset,
row_group_ordinal, column_ordinal, -1));
auto cipher_buffer =
AllocateBuffer(encryptor->pool(), encryptor->CiphertextLength(num_bytes_));
std::span<const uint8_t> bitset_span(data_->data(), num_bytes_);
int32_t cipher_buffer_len =
encryptor->Encrypt(bitset_span, cipher_buffer->mutable_span_as<uint8_t>());
PARQUET_THROW_NOT_OK(sink->Write(cipher_buffer->data(), cipher_buffer_len));
}

bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
const uint32_t bucket_index =
static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32);
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,20 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
void InsertHash(uint64_t hash) override;
void InsertHashes(const uint64_t* hashes, int num_values) override;
void WriteTo(ArrowOutputStream* sink) const override;

/// Serialize this Bloom filter as two encrypted modules (header and bitset)
/// using the supplied metadata encryptor.
///
/// The same encryptor is used for both modules, switching the AAD between
/// kBloomFilterHeader and kBloomFilterBitset before each encryption.
///
/// @param sink The output stream to write to.
/// @param encryptor Metadata encryptor for this column. Must not be null.
/// @param row_group_ordinal Ordinal of the row group containing this Bloom filter.
/// @param column_ordinal Ordinal of the column containing this Bloom filter.
void WriteEncrypted(ArrowOutputStream* sink, Encryptor* encryptor,
int16_t row_group_ordinal, int16_t column_ordinal) const;

uint32_t GetBitsetSize() const override { return num_bytes_; }

uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
Expand Down
51 changes: 45 additions & 6 deletions cpp/src/parquet/bloom_filter_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/checked_cast.h"

#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -151,12 +152,15 @@ namespace {

/// \brief A concrete implementation of BloomFilterBuilder.
///
/// \note Column encryption for bloom filter is not implemented yet.
/// When `file_encryptor` is provided, bloom filters of encrypted columns are
/// serialized using the column's metadata encryptor, bloom filters of
/// unencrypted columns are serialized in plaintext.
class BloomFilterBuilderImpl : public BloomFilterBuilder {
public:
BloomFilterBuilderImpl(const SchemaDescriptor* schema,
const WriterProperties* properties)
: schema_(schema), properties_(properties) {}
const WriterProperties* properties,
InternalFileEncryptor* file_encryptor)
: schema_(schema), properties_(properties), file_encryptor_(file_encryptor) {}

void AppendRowGroup() override;

Expand All @@ -183,6 +187,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {

const SchemaDescriptor* schema_;
const WriterProperties* properties_;
InternalFileEncryptor* file_encryptor_;
bool finished_ = false;

using RowGroupBloomFilters =
Expand Down Expand Up @@ -225,14 +230,47 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
}
finished_ = true;

// Bloom filter ordinals are encoded as int16 in the AAD when encryption is enabled.
constexpr size_t kEncryptedOrdinalLimit = std::numeric_limits<int16_t>::max(); // 32767

IndexLocations locations;

for (size_t i = 0; i != bloom_filters_.size(); ++i) {
auto& row_group_bloom_filters = bloom_filters_[i];
for (const auto& [column_id, filter] : row_group_bloom_filters) {
// TODO(GH-43138): Determine the quality of bloom filter before writing it.
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
filter->WriteTo(sink);

const auto column_path = schema_->Column(column_id)->path()->ToDotString();
std::shared_ptr<Encryptor> meta_encryptor =
file_encryptor_ != nullptr
? file_encryptor_->GetColumnMetaEncryptor(column_path)
: nullptr;
if (meta_encryptor != nullptr) {
const auto& column_props = properties_->column_encryption_properties(column_path);
if (column_props != nullptr && column_props->is_encrypted() &&
!column_props->is_encrypted_with_footer_key()) {
ParquetException::NYI("Bloom filter writing with a dedicated column key");
}
if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) {
throw ParquetException(
"Encrypted files cannot contain more than 32767 row groups");
}
if (ARROW_PREDICT_FALSE(static_cast<size_t>(column_id) >
kEncryptedOrdinalLimit)) {
throw ParquetException(
"Encrypted files cannot contain more than 32767 columns");
}
auto* block_filter = dynamic_cast<BlockSplitBloomFilter*>(filter.get());
if (block_filter == nullptr) {
throw ParquetException(
"Only BlockSplitBloomFilter is supported for encrypted bloom filters");
}
block_filter->WriteEncrypted(sink, meta_encryptor.get(), static_cast<int16_t>(i),
static_cast<int16_t>(column_id));
} else {
filter->WriteTo(sink);
}
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());

if (pos - offset > std::numeric_limits<int32_t>::max()) {
Expand All @@ -253,8 +291,9 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
} // namespace

std::unique_ptr<BloomFilterBuilder> BloomFilterBuilder::Make(
const SchemaDescriptor* schema, const WriterProperties* properties) {
return std::make_unique<BloomFilterBuilderImpl>(schema, properties);
const SchemaDescriptor* schema, const WriterProperties* properties,
InternalFileEncryptor* file_encryptor) {
return std::make_unique<BloomFilterBuilderImpl>(schema, properties, file_encryptor);
}

} // namespace parquet
9 changes: 7 additions & 2 deletions cpp/src/parquet/bloom_filter_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "arrow/type_fwd.h"

#include "parquet/bloom_filter.h"
#include "parquet/encryption/type_fwd.h"
#include "parquet/index_location.h"
#include "parquet/type_fwd.h"

Expand Down Expand Up @@ -71,8 +72,12 @@ class PARQUET_EXPORT BloomFilterBuilder {
/// \param schema The schema of the file and it must outlive the created builder.
/// \param properties Properties to get bloom filter options. It must outlive the
/// created builder.
static std::unique_ptr<BloomFilterBuilder> Make(const SchemaDescriptor* schema,
const WriterProperties* properties);
/// \param file_encryptor File level encryptor used to encrypt bloom filters of
/// encrypted columns. May be null for unencrypted files. Must outlive the created
/// builder.
static std::unique_ptr<BloomFilterBuilder> Make(
const SchemaDescriptor* schema, const WriterProperties* properties,
InternalFileEncryptor* file_encryptor = NULLPTR);

/// \brief Start a new row group to write bloom filters, meaning that next calls
/// to `CreateBloomFilter` will create bloom filters for the new row group.
Expand Down
103 changes: 103 additions & 0 deletions cpp/src/parquet/encryption/bloom_filter_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
#include <string>

#include "arrow/io/file.h"
#include "arrow/io/memory.h"

#include "parquet/bloom_filter.h"
#include "parquet/bloom_filter_reader.h"
#include "parquet/encryption/test_encryption_util.h"
#include "parquet/file_reader.h"
#include "parquet/file_writer.h"
#include "parquet/properties.h"
#include "parquet/schema.h"

namespace parquet::encryption::test {
namespace {
Expand Down Expand Up @@ -91,4 +94,104 @@ TEST(EncryptedBloomFilterReader, ReadEncryptedBloomFilter) {
}
}

namespace {

std::shared_ptr<schema::GroupNode> SingleInt64Schema(const std::string& field_name) {
auto field = schema::PrimitiveNode::Make(field_name, Repetition::REQUIRED, Type::INT64,
ConvertedType::NONE);
return std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, {field}));
}

std::shared_ptr<FileEncryptionProperties> BuildEncryptionProperties(
const std::string& /*field_name*/) {
FileEncryptionProperties::Builder builder(kFooterEncryptionKey);
return builder.build();
}

std::shared_ptr<FileDecryptionProperties> BuildDecryptionPropertiesWithExplicitKeys(
const std::string& /*field_name*/) {
FileDecryptionProperties::Builder builder;
return builder.footer_key(kFooterEncryptionKey)->build();
}

} // namespace

// Round trip, write a small encrypted file with a Bloom filter on the encrypted
// column, then read it back and verify the Bloom filter contains the inserted
// values and rejects values that were never inserted.
TEST(EncryptedBloomFilterWriter, RoundTripEncryptedBloomFilter) {
const std::string field_name = "id";
constexpr int kNumValues = 64;

auto schema = SingleInt64Schema(field_name);

WriterProperties::Builder prop_builder;
prop_builder.compression(Compression::UNCOMPRESSED);
prop_builder.enable_bloom_filter(field_name, {});
prop_builder.encryption(BuildEncryptionProperties(field_name));
auto writer_properties = prop_builder.build();

PARQUET_ASSIGN_OR_THROW(auto sink, ::arrow::io::BufferOutputStream::Create());
auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties);
auto* row_group_writer = file_writer->AppendRowGroup();
auto* int64_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
std::vector<int64_t> values(kNumValues);
for (int i = 0; i < kNumValues; ++i) {
values[i] = static_cast<int64_t>(i) * 7 + 13;
}
int64_writer->WriteBatch(static_cast<int64_t>(values.size()), nullptr, nullptr,
values.data());
file_writer->Close();
PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());

ReaderProperties reader_properties = default_reader_properties();
reader_properties.file_decryption_properties(
BuildDecryptionPropertiesWithExplicitKeys(field_name));

auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
auto file_reader = ParquetFileReader::Open(source, reader_properties);
auto& bloom_filter_reader = file_reader->GetBloomFilterReader();
auto row_group_0 = bloom_filter_reader.RowGroup(0);
ASSERT_NE(nullptr, row_group_0);
auto filter = row_group_0->GetColumnBloomFilter(0);
ASSERT_NE(nullptr, filter);

for (int64_t value : values) {
EXPECT_TRUE(filter->FindHash(filter->Hash(value)))
<< "missing inserted value " << value;
}

for (int64_t miss : {int64_t{-1}, int64_t{1'000'000}, int64_t{1'000'001}}) {
EXPECT_FALSE(filter->FindHash(filter->Hash(miss)))
<< "unexpected hit for non-inserted value " << miss;
}
}

TEST(EncryptedBloomFilterWriter, ColumnKeyEncryptedBloomFilterIsNotYetImplemented) {
const std::string field_name = "id";
auto schema = SingleInt64Schema(field_name);

auto col_props =
ColumnEncryptionProperties::Builder().key(kColumnEncryptionKey1)->build();
ColumnPathToEncryptionPropertiesMap encrypted_columns{{field_name, col_props}};
FileEncryptionProperties::Builder enc_builder(kFooterEncryptionKey);
auto file_encryption_properties =
enc_builder.encrypted_columns(std::move(encrypted_columns))->build();

WriterProperties::Builder prop_builder;
prop_builder.compression(Compression::UNCOMPRESSED);
prop_builder.enable_bloom_filter(field_name, {});
prop_builder.encryption(file_encryption_properties);
auto writer_properties = prop_builder.build();

PARQUET_ASSIGN_OR_THROW(auto sink, ::arrow::io::BufferOutputStream::Create());
auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties);
auto* row_group_writer = file_writer->AppendRowGroup();
auto* int64_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
int64_t value = 42;
int64_writer->WriteBatch(1, nullptr, nullptr, &value);
EXPECT_THROW(file_writer->Close(), ParquetException);
}

} // namespace parquet::encryption::test
10 changes: 5 additions & 5 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,10 @@ class FileSerializer : public ParquetFileWriter::Contents {

void WriteBloomFilter() {
if (bloom_filter_builder_ != nullptr) {
if (properties_->file_encryption_properties()) {
ParquetException::NYI("Encryption is not currently supported with bloom filter");
}
// Serialize bloom filter after all row groups have been written and report
// location to the file metadata.
// location to the file metadata. Bloom filters of encrypted columns are
// encrypted using each column's metadata encryptor (the builder was
// constructed with the file-level encryptor when encryption was enabled).
auto locations = bloom_filter_builder_->WriteTo(sink_.get());
metadata_->SetIndexLocations(IndexKind::kBloomFilter, locations);
}
Expand Down Expand Up @@ -575,7 +574,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
}
if (properties_->bloom_filter_enabled()) {
bloom_filter_builder_ = BloomFilterBuilder::Make(schema(), properties_.get());
bloom_filter_builder_ =
BloomFilterBuilder::Make(schema(), properties_.get(), file_encryptor_.get());
}
if (properties_->page_index_enabled()) {
page_index_builder_ = PageIndexBuilder::Make(&schema_, file_encryptor_.get());
Expand Down
Loading