diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index db34efa6a..9b1729f63 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -42,6 +42,8 @@ type kernelLogWatcher struct { tomb *tomb.Tomb kmsgParser kmsgparser.Parser + // newParser creates a kmsgparser. Overridable in tests; defaults to kmsgparser.NewParser. + newParser func() (kmsgparser.Parser, error) } // NewKmsgWatcher creates a watcher which will read messages from /dev/kmsg @@ -60,7 +62,8 @@ func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher { startTime: startTime, tomb: tomb.NewTomb(), // Arbitrary capacity - logCh: make(chan *logtypes.Log, 100), + logCh: make(chan *logtypes.Log, 100), + newParser: kmsgparser.NewParser, } } @@ -69,7 +72,7 @@ var _ types.WatcherCreateFunc = NewKmsgWatcher func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) { if k.kmsgParser == nil { // nil-check to make mocking easier - parser, err := kmsgparser.NewParser() + parser, err := k.newParser() if err != nil { return nil, fmt.Errorf("failed to create kmsg parser: %v", err) } @@ -150,7 +153,7 @@ func (k *kernelLogWatcher) watchLoop() { // It returns the new message channel and true on success, or nil and false if stopping was signaled. func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) { for { - parser, err := kmsgparser.NewParser() + parser, err := k.newParser() if err != nil { klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) } else if seekErr := parser.SeekEnd(); seekErr != nil { diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go index b29a0431f..50d24e39a 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go @@ -17,13 +17,13 @@ limitations under the License. package kmsg import ( + "fmt" "sync" "testing" "time" "github.com/euank/go-kmsg-parser/kmsgparser" "github.com/stretchr/testify/assert" - testclock "k8s.io/utils/clock/testing" "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" @@ -34,291 +34,443 @@ import ( type mockKmsgParser struct { kmsgs []kmsgparser.Message closeAfterSend bool - closeCalled bool - mu sync.Mutex + // closeClosesChannel, when true, causes Close() to close the Parse() + // output channel, mirroring the real kmsgparser's behavior where + // closing the underlying reader terminates its read goroutine and the + // deferred close(output) fires. + closeClosesChannel bool + // seekEndErr, if non-nil, is returned from SeekEnd(). + seekEndErr error + + mu sync.Mutex + closeCount int + seekEndCount int + parseCount int + done chan struct{} + doneOnce sync.Once } func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {} func (m *mockKmsgParser) Close() error { m.mu.Lock() - defer m.mu.Unlock() - m.closeCalled = true + m.closeCount++ + if m.closeClosesChannel && m.done == nil { + m.done = make(chan struct{}) + } + done := m.done + m.mu.Unlock() + + if m.closeClosesChannel { + m.doneOnce.Do(func() { close(done) }) + } return nil } -func (m *mockKmsgParser) WasCloseCalled() bool { +func (m *mockKmsgParser) CloseCallCount() int { m.mu.Lock() defer m.mu.Unlock() - return m.closeCalled + return m.closeCount } func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message { + m.mu.Lock() + m.parseCount++ c := make(chan kmsgparser.Message) + if m.done == nil { + m.done = make(chan struct{}) + } + done := m.done + m.mu.Unlock() + go func() { for _, msg := range m.kmsgs { - c <- msg + select { + case c <- msg: + case <-done: + // Close() was called mid-send. Mirror real kmsgparser: + // close the output so the consumer sees ok == false. + close(c) + return + } } if m.closeAfterSend { close(c) + return + } + if m.closeClosesChannel { + // Wait for Close() to signal, then close the output channel. + <-done + close(c) } }() return c } -func (m *mockKmsgParser) SeekEnd() error { return nil } +func (m *mockKmsgParser) SeekEnd() error { + m.mu.Lock() + m.seekEndCount++ + m.mu.Unlock() + return m.seekEndErr +} + +func (m *mockKmsgParser) SeekEndCallCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.seekEndCount +} + +func (m *mockKmsgParser) ParseCallCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.parseCount +} func TestWatch(t *testing.T) { now := time.Date(time.Now().Year(), time.January, 2, 3, 4, 5, 0, time.Local) - fakeClock := testclock.NewFakeClock(now) testCases := []struct { + name string uptime time.Duration lookback string delay string - log *mockKmsgParser + kmsgs []kmsgparser.Message logs []logtypes.Log }{ { - // The start point is at the head of the log file. + name: "start at head of log", uptime: 0, lookback: "0", delay: "0", - log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ + kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(0 * time.Second)}, {Message: "2", Timestamp: now.Add(1 * time.Second)}, {Message: "3", Timestamp: now.Add(2 * time.Second)}, - }}, + }, logs: []logtypes.Log{ - { - Timestamp: now, - Message: "1", - }, - { - Timestamp: now.Add(time.Second), - Message: "2", - }, - { - Timestamp: now.Add(2 * time.Second), - Message: "3", - }, + {Timestamp: now, Message: "1"}, + {Timestamp: now.Add(time.Second), Message: "2"}, + {Timestamp: now.Add(2 * time.Second), Message: "3"}, }, }, { - // The start point is in the middle of the log file. + name: "start in middle of log", uptime: 0, lookback: "0", delay: "0", - log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ + kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(-1 * time.Second)}, {Message: "2", Timestamp: now.Add(0 * time.Second)}, {Message: "3", Timestamp: now.Add(1 * time.Second)}, - }}, + }, logs: []logtypes.Log{ - { - Timestamp: now, - Message: "2", - }, - { - Timestamp: now.Add(time.Second), - Message: "3", - }, + {Timestamp: now, Message: "2"}, + {Timestamp: now.Add(time.Second), Message: "3"}, }, }, { - // The start point is at the end of the log file, but we look back. + name: "start at end with lookback", uptime: 2 * time.Second, lookback: "1s", delay: "0", - log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ + kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(-2 * time.Second)}, {Message: "2", Timestamp: now.Add(-1 * time.Second)}, {Message: "3", Timestamp: now.Add(0 * time.Second)}, - }}, + }, logs: []logtypes.Log{ - { - Timestamp: now.Add(-time.Second), - Message: "2", - }, - { - Timestamp: now, - Message: "3", - }, + {Timestamp: now.Add(-time.Second), Message: "2"}, + {Timestamp: now, Message: "3"}, }, }, { - // The start point is at the end of the log file, but we look back up to start time. + name: "lookback bounded by uptime", uptime: time.Second, lookback: "3s", delay: "0", - log: &mockKmsgParser{kmsgs: []kmsgparser.Message{ + kmsgs: []kmsgparser.Message{ {Message: "1", Timestamp: now.Add(-3 * time.Second)}, {Message: "2", Timestamp: now.Add(-2 * time.Second)}, {Message: "3", Timestamp: now.Add(-1 * time.Second)}, {Message: "4", Timestamp: now.Add(0 * time.Second)}, - }}, + }, logs: []logtypes.Log{ - { - Timestamp: now.Add(-time.Second), - Message: "3", - }, - { - Timestamp: now, - Message: "4", - }, + {Timestamp: now.Add(-time.Second), Message: "3"}, + {Timestamp: now, Message: "4"}, }, }, } - for _, test := range testCases { - w := NewKmsgWatcher(types.WatcherConfig{Lookback: test.lookback}) - w.(*kernelLogWatcher).startTime, _ = util.GetStartTime(fakeClock.Now(), test.uptime, test.lookback, test.delay) - w.(*kernelLogWatcher).kmsgParser = test.log - logCh, err := w.Watch() - if err != nil { - t.Fatal(err) - } - defer w.Stop() - for _, expected := range test.logs { - got := <-logCh - assert.Equal(t, &expected, got) - } - // The log channel should have already been drained - // There could still be future messages sent into the channel, but the chance is really slim. - timeout := time.After(100 * time.Millisecond) - select { - case log := <-logCh: - t.Errorf("unexpected extra log: %+v", *log) - case <-timeout: - } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + startTime, err := util.GetStartTime(now, tc.uptime, tc.lookback, tc.delay) + if err != nil { + t.Fatal(err) + } + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{Lookback: tc.lookback}, + startTime: startTime, + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: &mockKmsgParser{kmsgs: tc.kmsgs}, + } + logCh, err := w.Watch() + if err != nil { + t.Fatal(err) + } + defer w.Stop() + for _, expected := range tc.logs { + got := <-logCh + assert.Equal(t, &expected, got) + } + // The log channel should have already been drained. + select { + case log := <-logCh: + t.Errorf("unexpected extra log: %+v", *log) + case <-time.After(100 * time.Millisecond): + } + }) } } -func TestWatcherStopsGracefullyOnTombStop(t *testing.T) { - now := time.Now() - - mock := &mockKmsgParser{ - kmsgs: []kmsgparser.Message{ - {Message: "test message", Timestamp: now}, - }, - closeAfterSend: false, // Don't close, let tomb stop it - } - +// TestWatchReturnsErrorWhenNewParserFails verifies that Watch() propagates +// the newParser factory's error when no parser has been pre-set on the +// watcher, instead of starting watchLoop with a nil parser. +func TestWatchReturnsErrorWhenNewParserFails(t *testing.T) { w := &kernelLogWatcher{ - cfg: types.WatcherConfig{}, - startTime: now.Add(-time.Second), - tomb: tomb.NewTomb(), - logCh: make(chan *logtypes.Log, 100), - kmsgParser: mock, + cfg: types.WatcherConfig{}, + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + // kmsgParser is deliberately left nil so Watch() must call newParser. + newParser: func() (kmsgparser.Parser, error) { + return nil, fmt.Errorf("simulated newParser failure") + }, } logCh, err := w.Watch() - assert.NoError(t, err) - - // Should receive the message - select { - case log := <-logCh: - assert.Equal(t, "test message", log.Message) - case <-time.After(time.Second): - t.Fatal("timeout waiting for log message") - } + assert.Error(t, err) + assert.Nil(t, logCh) + assert.Contains(t, err.Error(), "failed to create kmsg parser") + assert.Contains(t, err.Error(), "simulated newParser failure") +} - // Stop the watcher - w.Stop() +// TestStopClosesParserCleanly verifies that Stop() shuts the watcher down +// cleanly: the log channel is closed, the parser's Close() is called +// exactly once (regression guard for the fix that removed Close() from +// Stop()), and the watch loop's restart path is not triggered (the +// injected newParser must not be called). +// +// The "realistic parser" case is the one that exercises the exact +// production bug: when Close() closes the Parse() output channel, the +// buggy version of Stop() drove watchLoop down the restart path during +// intentional shutdown. +func TestStopClosesParserCleanly(t *testing.T) { + now := time.Now() - // Log channel should be closed after stop - select { - case _, ok := <-logCh: - assert.False(t, ok, "log channel should be closed after Stop()") - case <-time.After(time.Second): - t.Fatal("timeout waiting for log channel to close after Stop()") + cases := []struct { + name string + kmsgs []kmsgparser.Message + closeClosesChannel bool + }{ + { + name: "single message", + kmsgs: []kmsgparser.Message{ + {Message: "test message", Timestamp: now}, + }, + }, + { + name: "multiple messages", + kmsgs: []kmsgparser.Message{ + {Message: "msg-1", Timestamp: now}, + {Message: "msg-2", Timestamp: now.Add(time.Second)}, + }, + }, + { + name: "realistic parser (Close closes Parse channel)", + kmsgs: []kmsgparser.Message{ + {Message: "test message", Timestamp: now}, + }, + closeClosesChannel: true, + }, } - // Verify parser was closed - assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + mock := &mockKmsgParser{ + kmsgs: tc.kmsgs, + closeClosesChannel: tc.closeClosesChannel, + } + + factoryCalls := 0 + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Minute), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + newParser: func() (kmsgparser.Parser, error) { + factoryCalls++ + return nil, fmt.Errorf("factory should not be called during clean Stop") + }, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Drain all messages so the parser goroutine has finished sending + // before we Stop. Keeps the test deterministic and focused on Stop. + for _, expected := range tc.kmsgs { + select { + case got := <-logCh: + assert.Equal(t, expected.Message, got.Message) + case <-time.After(time.Second): + t.Fatalf("timeout waiting for message %q", expected.Message) + } + } + + w.Stop() + + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed after Stop()") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close after Stop()") + } + + assert.Equal(t, 0, factoryCalls, "newParser should not be called during clean Stop") + assert.Equal(t, 1, mock.CloseCallCount(), "parser Close() should be called exactly once after Stop()") + }) + } } -func TestWatcherProcessesEmptyMessages(t *testing.T) { +// TestWatcherRestartsOnUnexpectedChannelClose exercises the legitimate +// restart path from #1192: when the Parse() channel closes outside of a +// Stop() (e.g. the underlying /dev/kmsg reader hit an error), watchLoop +// must recover by calling newParser() and continue delivering messages +// from the fresh parser. Prior to the factory injection there was no +// test coverage for this path. +func TestWatcherRestartsOnUnexpectedChannelClose(t *testing.T) { now := time.Now() - mock := &mockKmsgParser{ + // First parser: sends one message then closes its Parse channel to + // simulate an underlying reader error. + first := &mockKmsgParser{ kmsgs: []kmsgparser.Message{ - {Message: "", Timestamp: now}, - {Message: "valid message", Timestamp: now.Add(time.Second)}, - {Message: "", Timestamp: now.Add(2 * time.Second)}, + {Message: "before-restart", Timestamp: now}, }, - closeAfterSend: false, + closeAfterSend: true, } + // Second parser: delivers messages after the restart. + second := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "after-restart", Timestamp: now.Add(time.Second)}, + }, + } + + factoryCalls := 0 w := &kernelLogWatcher{ cfg: types.WatcherConfig{}, - startTime: now.Add(-time.Second), + startTime: now.Add(-time.Minute), tomb: tomb.NewTomb(), logCh: make(chan *logtypes.Log, 100), - kmsgParser: mock, + kmsgParser: first, + newParser: func() (kmsgparser.Parser, error) { + factoryCalls++ + return second, nil + }, } logCh, err := w.Watch() assert.NoError(t, err) - // Should only receive the non-empty message select { case log := <-logCh: - assert.Equal(t, "valid message", log.Message) + assert.Equal(t, "before-restart", log.Message) case <-time.After(time.Second): - t.Fatal("timeout waiting for log message") + t.Fatal("timeout waiting for message before restart") + } + + select { + case log := <-logCh: + assert.Equal(t, "after-restart", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for message after restart") } - // Stop the watcher and verify channel closes w.Stop() select { case _, ok := <-logCh: assert.False(t, ok, "log channel should be closed after Stop()") case <-time.After(time.Second): - t.Fatal("timeout waiting for log channel to close") + t.Fatal("timeout waiting for log channel to close after Stop()") } + + assert.Equal(t, 1, factoryCalls, "newParser should have been called exactly once for the restart") + assert.Equal(t, 1, first.CloseCallCount(), "first parser should be closed once by the restart path") + assert.Equal(t, 1, second.CloseCallCount(), "second parser should be closed once by Stop's defer") + assert.Equal(t, 1, first.ParseCallCount(), "Parse should be called once on the initial parser") + assert.Equal(t, 1, second.ParseCallCount(), "Parse should be called once on the restart parser after a successful SeekEnd") + // The initial parser uses the watcher's startTime/lookback semantics + // and must NOT be seeked. The restart parser, by contrast, must be + // seeked to the end of the ring buffer to avoid replaying messages + // that were already processed before the restart. + assert.Equal(t, 0, first.SeekEndCallCount(), "SeekEnd should not be called on the initial parser") + assert.Equal(t, 1, second.SeekEndCallCount(), "SeekEnd should be called once on the restart parser") } -func TestWatcherTrimsMessageWhitespace(t *testing.T) { +// TestWatcherProcessesMessageContent verifies watchLoop's per-message +// handling: empty messages are dropped, and surrounding whitespace is +// trimmed before forwarding. +func TestWatcherProcessesMessageContent(t *testing.T) { now := time.Now() - mock := &mockKmsgParser{ - kmsgs: []kmsgparser.Message{ - {Message: " message with spaces ", Timestamp: now}, - {Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)}, - {Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)}, + cases := []struct { + name string + kmsgs []kmsgparser.Message + expected []string + }{ + { + name: "drops empty messages", + kmsgs: []kmsgparser.Message{ + {Message: "", Timestamp: now}, + {Message: "valid message", Timestamp: now.Add(time.Second)}, + {Message: "", Timestamp: now.Add(2 * time.Second)}, + }, + expected: []string{"valid message"}, + }, + { + name: "trims surrounding whitespace", + kmsgs: []kmsgparser.Message{ + {Message: " message with spaces ", Timestamp: now}, + {Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)}, + {Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)}, + }, + expected: []string{"message with spaces", "tabbed message", "newlines"}, }, - closeAfterSend: false, - } - - w := &kernelLogWatcher{ - cfg: types.WatcherConfig{}, - startTime: now.Add(-time.Second), - tomb: tomb.NewTomb(), - logCh: make(chan *logtypes.Log, 100), - kmsgParser: mock, - } - - logCh, err := w.Watch() - assert.NoError(t, err) - - expectedMessages := []string{"message with spaces", "tabbed message", "newlines"} - - for _, expected := range expectedMessages { - select { - case log := <-logCh: - assert.Equal(t, expected, log.Message) - case <-time.After(time.Second): - t.Fatalf("timeout waiting for message: %s", expected) - } } - // Stop the watcher and verify channel closes - w.Stop() - - select { - case _, ok := <-logCh: - assert.False(t, ok, "log channel should be closed after Stop()") - case <-time.After(time.Second): - t.Fatal("timeout waiting for log channel to close") + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Minute), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: &mockKmsgParser{kmsgs: tc.kmsgs}, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + defer w.Stop() + + for _, want := range tc.expected { + select { + case got := <-logCh: + assert.Equal(t, want, got.Message) + case <-time.After(time.Second): + t.Fatalf("timeout waiting for message %q", want) + } + } + }) } }