From 343a53f20a9245a9878850d88aab9c5b6706ec0b Mon Sep 17 00:00:00 2001 From: Warrick <1016weicheng@gmail.com> Date: Wed, 4 Mar 2026 10:43:27 +0800 Subject: [PATCH 1/2] [fix] avoid io-threads coredump && swap support io-threads --- src/iothread.c | 8 ++++++-- src/networking.c | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/iothread.c b/src/iothread.c index f6f74cdc143..0f52f753d38 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -146,7 +146,11 @@ int isClientMustHandledByMainThread(client *c) { if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_MASTER | CLIENT_SLAVE | CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED | CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG | - CLIENT_LUA_DEBUG_SYNC)) + CLIENT_LUA_DEBUG_SYNC +#ifdef ENABLE_SWAP + | CLIENT_SWAPPING +#endif + )) { return 1; } @@ -500,7 +504,7 @@ int processClientsFromIOThread(IOThread *t) { } if (t->io_thread_scale_status == IO_THREAD_SCALE_STATUS_DOWN || - (server.io_threads_scale_status == IO_THREAD_SCALE_STATUS_UP && + (server.io_threads_scale_status == IO_THREAD_SCALE_STATUS_UP && t->io_thread_scale_status != IO_THREAD_SCALE_STATUS_UP)) { keepClientInMainThread(c); if (isMultiThreads()) { diff --git a/src/networking.c b/src/networking.c index b7acad84bf2..debacda1206 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2563,7 +2563,8 @@ void unprotectClient(client *c) { if (c->conn) { if (c->tid == IOTHREAD_MAIN_THREAD_ID) connSetReadHandler(c->conn,readQueryFromClient); - if (clientHasPendingReplies(c)) putClientInPendingWriteQueue(c); + if (c->tid == IOTHREAD_MAIN_THREAD_ID && clientHasPendingReplies(c)) + putClientInPendingWriteQueue(c); } } } From baa4dc2eac9c6f95ad1309ed15d732cdd3421ce3 Mon Sep 17 00:00:00 2001 From: Warrick <1016weicheng@gmail.com> Date: Wed, 4 Mar 2026 17:51:59 +0800 Subject: [PATCH 2/2] [test] restore tcl of io-threads --- tests/swap/unit/io_thread.tcl | 289 ---------------------------------- tests/unit/io_thread.tcl | 2 +- 2 files changed, 1 insertion(+), 290 deletions(-) delete mode 100644 tests/swap/unit/io_thread.tcl diff --git a/tests/swap/unit/io_thread.tcl b/tests/swap/unit/io_thread.tcl deleted file mode 100644 index e260ff19c4a..00000000000 --- a/tests/swap/unit/io_thread.tcl +++ /dev/null @@ -1,289 +0,0 @@ -proc get_info_field {info field} { - set fl [string length $field] - append field : - foreach line [split $info "\n"] { - set line [string trim $line "\r\n "] - if {[string range $line 0 $fl] eq $field} { - return [string range $line [expr {$fl+1}] end] - } - } - return {} -} - -proc get_kv_value {input key} { - foreach pair [split $input ","] { - if {[regexp {^\s*([^=]+)\s*=\s*(.+?)\s*$} $pair -> k v]} { - if {$k eq $key} { - return $v - } - } - } - return "" -} - -start_server {overrides {}} { - r set k v - - - test "threads 1 => n and n => 1" { - for {set thread_size 2} {$thread_size < 5} {incr thread_size} { - assert_equal [get_info_field [r info threads] io_thread_scale_status] "none" - assert {[get_info_field [r info threads] io_thread_1 ] eq ""} - # set io-threads n - # when client size < thread size , thread scale up task finish - if {!$::external} { - set lines [count_log_lines 0] - } - assert_equal [get_kv_value [get_info_field [r info threads] io_thread_0 ] clients] 1 - r config set io-threads $thread_size - if {!$::external} { - wait_for_condition 200 50 { - [expr {![catch {verify_log_message 0 "*IO threads scale-up end*" $lines}]}] - } else { - fail "scale-up end log not found within timeout" - } - } - assert_equal [r get k] v - - - # reset io-threads 1 - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads 1 - after 200 - assert_equal [get_info_field [r info threads] io_thread_scale_status] "down" - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_1 ] eq "" - } else { - fail "thread down n => 1 fail" - } - if {!$::external} { - verify_log_message 0 "*IO threads scale-down end*" $lines - } - - # add clients - set clients [] - for {set j 0} {$j < 100} {incr j} { - set cli [redis [srv 0 "host"] [srv 0 "port"] 0 $::tls] - $cli select $::target_db - lappend clients $cli - } - after 200 - wait_for_condition 200 50 { - [expr {[get_kv_value [get_info_field [r info threads] io_thread_0 ] clients] == 101}] - } else { - fail "io_thread_0 clients did not reach 101 within timeout" - } - # set io-threads n - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads $thread_size - - if {!$::external} { - wait_for_condition 200 50 { - [expr {![catch {verify_log_message 0 "*IO threads scale-up end*" $lines}]}] - } else { - fail "scale-up end log not found within timeout" - } - } - assert_equal [r get k] v - for {set j 0} {$j < 100} {incr j} { - set cli [lindex $clients $j] - assert_equal [$cli get k] v - } - - - # reset io-threads 1 - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads 1 - assert_equal [get_info_field [r info threads] io_thread_scale_status] "down" - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_1 ] eq "" - } else { - fail "thread down n => 1 fail" - } - if {!$::external} { - verify_log_message 0 "*IO threads scale-down end*" $lines - } - - # close all clients - for {set j 0} {$j < 100} {incr j} { - set cli [lindex $clients $j] - $cli close - } - } - - - } - - # test before set io-threads 2 - r config set io-threads 2 - - test "threads 2 => n and n => 2" { - for {set thread_size 3} {$thread_size < 5} {incr thread_size} { - assert_equal [get_info_field [r info threads] io_thread_scale_status] "none" - assert {[get_info_field [r info threads] io_thread_1 ] ne ""} - assert {[get_info_field [r info threads] io_thread_2 ] eq ""} - - # set io-threads n - # when client size < thread size , thread scale up task finish - if {!$::external} { - set lines [count_log_lines 0] - } - assert_equal [get_kv_value [get_info_field [r info threads] io_thread_1 ] clients] 1 - r config set io-threads $thread_size - if {!$::external} { - wait_for_condition 200 50 { - [expr {![catch {verify_log_message 0 "*IO threads scale-up end*" $lines}]}] - } else { - fail "scale-up end log not found within timeout" - } - } - - - # reset io-threads 2 - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads 2 - assert_equal [get_info_field [r info threads] io_thread_scale_status] "down" - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_2 ] eq "" - } else { - fail "thread down n => 2 fail" - } - if {!$::external} { - verify_log_message 0 "*IO threads scale-down end*" $lines - } - - # add clients - set clients [] - for {set j 0} {$j < 100} {incr j} { - set cli [redis [srv 0 "host"] [srv 0 "port"] 0 $::tls] - if {!$::singledb} { - $cli select $::target_db - } - lappend clients $cli - } - wait_for_condition 200 50 { - [expr {[get_kv_value [get_info_field [r info threads] io_thread_1 ] clients] == 101}] - } else { - fail "io_thread_1 clients did not reach 101 within timeout" - } - - # set io-threads n - # wait CLIENT_IO_PENDING_CRON ,load balancing - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads $thread_size - assert_equal [get_info_field [r info threads] io_thread_scale_status] "up" - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_scale_status] eq "none" - } else { - fail "thread up 2=>n fail" - } - - if {!$::external} { - assert {[catch {verify_log_message 0 "*IO threads scale-up client num(1)< thread num*" $lines} errorMsg]} - assert {$errorMsg ne ""} - wait_for_condition 200 50 { - [expr {![catch {verify_log_message 0 "*IO threads scale-up end*" $lines}]}] - } else { - fail "scale-up end log not found within timeout" - } - } - - # reset io-threads 2 - # wait CLIENT_IO_PENDING_CRON - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads 2 - assert_equal [get_info_field [r info threads] io_thread_scale_status] "down" - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_2 ] eq "" - } else { - fail "thread down n => 2 fail" - } - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_scale_status] eq "none" - } else { - fail "thread down n => 2 scale_status did not reach none" - } - if {!$::external} { - verify_log_message 0 "*IO threads scale-down end*" $lines - } - - # set io-threads n - # client write, load balancing - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads $thread_size - assert_equal [get_info_field [r info threads] io_thread_scale_status] "up" - for {set j 0} {$j < 100} {incr j} { - set cli [lindex $clients $j] - assert_equal [$cli get k] v - } - assert_equal [get_info_field [r info threads] io_thread_scale_status] "none" - if {!$::external} { - assert {[catch {verify_log_message 0 "*IO threads scale-up client num(1)< thread num*" $lines} errorMsg]} - assert {$errorMsg ne ""} - wait_for_condition 200 50 { - [expr {![catch {verify_log_message 0 "*IO threads scale-up end*" $lines}]}] - } else { - fail "scale-up end log not found within timeout" - } - } - - - # reset io-threads 2 - # client write - if {!$::external} { - set lines [count_log_lines 0] - } - r config set io-threads 2 - assert_equal [get_info_field [r info threads] io_thread_scale_status] "down" - assert_equal [r get k] v - for {set j 0} {$j < 100} {incr j} { - set cli [lindex $clients $j] - assert_equal [$cli get k] v - } - - set info [r info threads] - if {[get_info_field $info io_thread_2] ne ""} { - assert_equal [get_kv_value [get_info_field [r info threads] io_thread_2 ] clients] 0 - # need wait thread_join - wait_for_condition 200 50 { - [get_info_field [r info threads] io_thread_scale_status] eq "none" - } else { - fail "thread down n => 2 fail" - } - } else { - assert_equal [get_info_field [r info threads] io_thread_scale_status] "none" - } - - - if {!$::external} { - verify_log_message 0 "*IO threads scale-down end*" $lines - } - - - - # close all clients - for {set j 0} {$j < 100} {incr j} { - set cli [lindex $clients $j] - $cli close - } - - } - - } - - -} \ No newline at end of file diff --git a/tests/unit/io_thread.tcl b/tests/unit/io_thread.tcl index 2bc893810f0..ab298241321 100644 --- a/tests/unit/io_thread.tcl +++ b/tests/unit/io_thread.tcl @@ -21,7 +21,7 @@ proc get_kv_value {input key} { return "" } -start_server {tags {"memonly"}} { +start_server {overrides {}} { r set k v