From 759400ebbe6423fd009c20de9e0f914cc6480dfd Mon Sep 17 00:00:00 2001 From: MaxSiominDev Date: Tue, 23 Jun 2026 02:56:57 +0300 Subject: [PATCH] feat(core): await connect-time requests before subscribe --- .../java/io/tarantool/core/IProtoClient.java | 2 + .../io/tarantool/core/IProtoClientImpl.java | 13 ++ .../protocol/fsm/WatcherStateMachine.java | 28 ++- .../integration/IProtoClientWatchersTest.java | 15 +- .../protocol/fsm/WatcherStateMachineTest.java | 179 ++++++++++++++++++ .../java/io/tarantool/pool/PoolEntry.java | 1 + 6 files changed, 232 insertions(+), 6 deletions(-) create mode 100644 tarantool-core/src/test/java/io/tarantool/core/protocol/fsm/WatcherStateMachineTest.java diff --git a/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java b/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java index 8deab204..e60c235a 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java +++ b/tarantool-core/src/main/java/io/tarantool/core/IProtoClient.java @@ -250,6 +250,8 @@ CompletableFuture id( Integer getServerProtocolVersion(); + CompletableFuture awaitConnectionReady(); + Set getClientFeatures(); Set getServerFeatures(); diff --git a/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java b/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java index 5da8c07f..c7d9b27a 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java +++ b/tarantool-core/src/main/java/io/tarantool/core/IProtoClientImpl.java @@ -740,6 +740,19 @@ public Integer getServerProtocolVersion() { return serverProtocolVersion.join(); } + @Override + public CompletableFuture awaitConnectionReady() { + List> ready = new ArrayList<>(); + ready.add(serverProtocolVersion); + for (Watcher watcher : watchers.values()) { + WatcherStateMachine context = watcher.getStateContext(); + if (context != null) { + ready.add(context.registered()); + } + } + return CompletableFuture.allOf(ready.toArray(new CompletableFuture[0])); + } + @Override public Set getClientFeatures() { return clientFeaturesEnum; diff --git a/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/WatcherStateMachine.java b/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/WatcherStateMachine.java index 933829d8..f4e6a986 100644 --- a/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/WatcherStateMachine.java +++ b/tarantool-core/src/main/java/io/tarantool/core/protocol/fsm/WatcherStateMachine.java @@ -37,6 +37,8 @@ public class WatcherStateMachine implements IProtoStateMachine { private final WatcherOptions opts; + private final CompletableFuture registered = new CompletableFuture<>(); + private boolean calledOnce; public WatcherStateMachine( @@ -80,6 +82,7 @@ public boolean process(IProtoResponse message) { log.debug("WatcherStateMachine:process() - \"{}\"", message); if (!message.isError()) { + registered.complete(null); callback.accept(message); // Used to avoid sending a response to the box.shutdown event during a // graceful shutdown. @@ -87,26 +90,37 @@ public boolean process(IProtoResponse message) { send(); } } else { + ClientException error = new ClientException("watcher error: %s", message); + registered.completeExceptionally(error); log.warn("got error for watcher: {}", message); - opts.getErrorHandler().accept(key, new ClientException("watcher error: %s", message)); + opts.getErrorHandler().accept(key, error); } return false; } @Override - public void kill(Throwable exc) {} + public void kill(Throwable exc) { + registered.completeExceptionally( + exc != null ? exc : new ClientException("watcher '%s' killed before registration", key)); + } + + /** Completes when the server acknowledges the watch */ + public CompletableFuture registered() { + return registered; + } private void onSendComplete(Void r, Throwable exc) { if (exc != null) { - log.warn("could not send IPROTO_WATCH packet: %s", exc); + log.warn("could not send IPROTO_WATCH packet", exc); + registered.completeExceptionally(exc); } else { log.debug("IPROTO_WATCH sent"); } } private void send() { - CompletableFuture future = connection.send(request).whenComplete(this::onSendComplete); + CompletableFuture future = connection.send(request); Timeout timer = timerService.newTimeout( timeoutHandler -> { @@ -116,6 +130,10 @@ private void send() { }, opts.getSendTimeout(), TimeUnit.MILLISECONDS); - future.whenComplete((r, exc) -> timer.cancel()); + future.whenComplete( + (r, exc) -> { + timer.cancel(); + onSendComplete(r, exc); + }); } } diff --git a/tarantool-core/src/test/java/io/tarantool/core/integration/IProtoClientWatchersTest.java b/tarantool-core/src/test/java/io/tarantool/core/integration/IProtoClientWatchersTest.java index 4c642825..e4c9aa3e 100644 --- a/tarantool-core/src/test/java/io/tarantool/core/integration/IProtoClientWatchersTest.java +++ b/tarantool-core/src/test/java/io/tarantool/core/integration/IProtoClientWatchersTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.CompletionException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.AfterAll; @@ -116,6 +117,18 @@ public void testWatchOnce() throws Exception { testWatchOnceOnContainer(tarantoolContainer, System.getenv("TARANTOOL_VERSION")); } + @Test + @Timeout(5) + public void testAwaitConnectionReady() throws Exception { + IProtoClient client = getClientAndConnect(tarantoolContainer); + + // must not hang; @Timeout guards a missed completion + client.awaitConnectionReady().join(); + assertTrue(client.getServerProtocolVersion() >= 0); + + client.close(); + } + private void testWatchOnceOnContainer(TarantoolContainer tt, String version) throws Exception { IProtoClient client = getClientAndConnect(tt); Integer serverVersion = client.getServerProtocolVersion(); @@ -123,7 +136,7 @@ private void testWatchOnceOnContainer(TarantoolContainer tt, String version) thr CompletionException ex = assertThrows(CompletionException.class, () -> client.watchOnce("key1").join()); Throwable cause = ex.getCause(); - assertTrue(cause instanceof BoxError); + assertInstanceOf(BoxError.class, cause); assertTrue(cause.getMessage().contains("Unknown request type 77")); return; } diff --git a/tarantool-core/src/test/java/io/tarantool/core/protocol/fsm/WatcherStateMachineTest.java b/tarantool-core/src/test/java/io/tarantool/core/protocol/fsm/WatcherStateMachineTest.java new file mode 100644 index 00000000..3eae7827 --- /dev/null +++ b/tarantool-core/src/test/java/io/tarantool/core/protocol/fsm/WatcherStateMachineTest.java @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2026 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY + * All Rights Reserved. + */ + +package io.tarantool.core.protocol.fsm; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.msgpack.value.Value; +import org.msgpack.value.ValueFactory; + +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_ERROR_BASE; +import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_OK; +import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_REQUEST_TYPE; +import io.tarantool.core.WatcherOptions; +import io.tarantool.core.connection.Connection; +import io.tarantool.core.connection.ConnectionCloseEvent; +import io.tarantool.core.connection.Greeting; +import io.tarantool.core.protocol.IProtoMessage; +import io.tarantool.core.protocol.IProtoRawResponse; +import io.tarantool.core.protocol.IProtoResponse; + +@Timeout(5) +class WatcherStateMachineTest { + + private final Timer timer = new HashedWheelTimer(); + + @AfterEach + void tearDown() { + timer.stop(); + } + + private WatcherStateMachine newWatcher(Connection connection) { + return new WatcherStateMachine( + "box.shutdown", 1L, response -> {}, connection, WatcherOptions.builder().build(), timer); + } + + private static IProtoResponse response(boolean error) { + Map header = new HashMap<>(); + header.put( + MP_IPROTO_REQUEST_TYPE, ValueFactory.newInteger(error ? IPROTO_ERROR_BASE : IPROTO_OK)); + return new IProtoRawResponse(ValueFactory.newMap(header), new byte[0], 0); + } + + @Test + void registeredCompletesOnFirstEvent() { + WatcherStateMachine watcher = newWatcher(new FakeConnection()); + watcher.runOnce(); + assertFalse(watcher.registered().isDone()); + + watcher.process(response(false)); + + assertTrue(watcher.registered().isDone()); + assertFalse(watcher.registered().isCompletedExceptionally()); + } + + @Test + void registeredFailsOnErrorEvent() { + WatcherStateMachine watcher = newWatcher(new FakeConnection()); + watcher.runOnce(); + + watcher.process(response(true)); + + assertTrue(watcher.registered().isCompletedExceptionally()); + } + + @Test + void registeredFailsOnKill() { + WatcherStateMachine watcher = newWatcher(new FakeConnection()); + watcher.runOnce(); + + watcher.kill(new RuntimeException("connection closed")); + + assertTrue(watcher.registered().isCompletedExceptionally()); + } + + @Test + void registeredFailsWhenSendFails() { + FakeConnection connection = new FakeConnection(); + CompletableFuture failed = new CompletableFuture<>(); + failed.completeExceptionally(new RuntimeException("send failed")); + connection.sendResult = failed; + WatcherStateMachine watcher = newWatcher(connection); + + watcher.runOnce(); + + assertTrue(watcher.registered().isCompletedExceptionally()); + } + + @Test + void registeredFailsWhenSendTimesOut() { + FakeConnection connection = new FakeConnection(); + connection.sendResult = new CompletableFuture<>(); + WatcherStateMachine watcher = + new WatcherStateMachine( + "box.shutdown", + 1L, + response -> {}, + connection, + WatcherOptions.builder().withSendTimeout(50).build(), + timer); + + watcher.runOnce(); + + assertThrows(Exception.class, () -> watcher.registered().get(2, TimeUnit.SECONDS)); + assertTrue(watcher.registered().isCompletedExceptionally()); + } + + private static final class FakeConnection implements Connection { + private CompletableFuture sendResult = CompletableFuture.completedFuture(null); + + @Override + public CompletableFuture send(IProtoMessage message) { + return sendResult; + } + + @Override + public boolean isConnected() { + return true; + } + + @Override + public CompletableFuture connect(InetSocketAddress addr, long timeoutMs) { + return new CompletableFuture<>(); + } + + @Override + public Connection listen(Consumer listener) { + return this; + } + + @Override + public Connection onClose( + ConnectionCloseEvent event, BiConsumer handler) { + return this; + } + + @Override + public Optional getGreeting() { + return Optional.empty(); + } + + @Override + public void shutdownClose() {} + + @Override + public void pause() {} + + @Override + public void resume() {} + + @Override + public void setIdleTimeout(int idleTimeout) {} + + @Override + public boolean isPaused() { + return false; + } + + @Override + public void close() {} + } +} diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index a746b9c2..a1e0067f 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -428,6 +428,7 @@ private CompletableFuture internalConnect() { } return client.ping(firstPingOpts); }) + .thenCompose(r -> client.awaitConnectionReady()) .thenApply(r -> client) .whenComplete(this::onConnectComplete); return connectFuture;