Skip to content
26 changes: 26 additions & 0 deletions cpp/src/arrow/json/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,32 @@ TEST(ReaderTest, MultipleChunksParallel) {
AssertTablesEqual(*serial, *threaded);
}

// Regression test for intermittent threading crashes on MinGW.
// Run this test multiple times manually to stress-test:
// while build/debug/arrow-json-test \
// --gtest_filter=ReaderTest.MultipleChunksParallelRegression; do :; done
// See https://github.com/apache/arrow/issues/49272
TEST(ReaderTest, MultipleChunksParallelRegression) {
int64_t count = 1 << 10;
ParseOptions parse_options;
parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;
ReadOptions read_options;
read_options.block_size = static_cast<int>(count / 2);
read_options.use_threads = true;

std::string json;
for (int i = 0; i < count; ++i) {
json += "{\"a\":" + std::to_string(i) + "}\n";
}

std::shared_ptr<io::InputStream> input;
ASSERT_OK(MakeStream(json, &input));
ASSERT_OK_AND_ASSIGN(auto reader, TableReader::Make(default_memory_pool(), input,
read_options, parse_options));
ASSERT_OK_AND_ASSIGN(auto table, reader->Read());
ASSERT_EQ(table->num_rows(), count);
}

TEST(ReaderTest, ListArrayWithFewValues) {
// ARROW-7647
ParseOptions parse_options;
Expand Down
36 changes: 34 additions & 2 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/util/io_util.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/mutex.h"
#include "arrow/util/windows_compatibility.h"

#include "arrow/util/tracing_internal.h"

Expand Down Expand Up @@ -630,9 +631,40 @@ void ThreadPool::CollectFinishedWorkersUnlocked() {
state_->finished_workers_.clear();
}

// MinGW's __emutls implementation for C++ thread_local has known race conditions
// during thread creation that can cause segfaults. Use native Win32 TLS instead.
// See https://github.com/apache/arrow/issues/49272
# ifdef __MINGW32__

namespace {
DWORD GetPoolTlsIndex() {
static DWORD index = [] {
DWORD i = TlsAlloc();
if (i == TLS_OUT_OF_INDEXES) {
ARROW_LOG(FATAL) << "TlsAlloc failed for thread pool TLS: "
<< WinErrorMessage(GetLastError());
}
return i;
}();
return index;
}
} // namespace

static ThreadPool* GetCurrentThreadPool() {
return static_cast<ThreadPool*>(TlsGetValue(GetPoolTlsIndex()));
}

static void SetCurrentThreadPool(ThreadPool* pool) {
TlsSetValue(GetPoolTlsIndex(), pool);
}
# else
thread_local ThreadPool* current_thread_pool_ = nullptr;

bool ThreadPool::OwnsThisThread() { return current_thread_pool_ == this; }
static ThreadPool* GetCurrentThreadPool() { return current_thread_pool_; }
static void SetCurrentThreadPool(ThreadPool* pool) { current_thread_pool_ = pool; }
# endif

bool ThreadPool::OwnsThisThread() { return GetCurrentThreadPool() == this; }

void ThreadPool::LaunchWorkersUnlocked(int threads) {
std::shared_ptr<State> state = sp_state_;
Expand All @@ -641,7 +673,7 @@ void ThreadPool::LaunchWorkersUnlocked(int threads) {
state_->workers_.emplace_back();
auto it = --(state_->workers_.end());
*it = std::thread([this, state, it] {
current_thread_pool_ = this;
SetCurrentThreadPool(this);
WorkerLoop(state, it);
});
}
Expand Down
Loading