Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ CompletableFuture<IProtoResponse> id(

Integer getServerProtocolVersion();

CompletableFuture<Void> awaitConnectionReady();

Set<IProtoFeature> getClientFeatures();

Set<IProtoFeature> getServerFeatures();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,19 @@ public Integer getServerProtocolVersion() {
return serverProtocolVersion.join();
}

@Override
public CompletableFuture<Void> awaitConnectionReady() {
List<CompletableFuture<?>> ready = new ArrayList<>();
ready.add(serverProtocolVersion);
for (Watcher watcher : watchers.values()) {
WatcherStateMachine context = watcher.getStateContext();
if (context != null) {
ready.add(context.registered());
}
}
return CompletableFuture.allOf(ready.toArray(new CompletableFuture[0]));
}

@Override
public Set<IProtoFeature> getClientFeatures() {
return clientFeaturesEnum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class WatcherStateMachine implements IProtoStateMachine {

private final WatcherOptions opts;

private final CompletableFuture<Void> registered = new CompletableFuture<>();

private boolean calledOnce;

public WatcherStateMachine(
Expand Down Expand Up @@ -80,33 +82,45 @@ public boolean process(IProtoResponse message) {
log.debug("WatcherStateMachine:process() - \"{}\"", message);

if (!message.isError()) {
registered.complete(null);
callback.accept(message);
// Used to avoid sending a response to the box.shutdown event during a
// graceful shutdown.
if (connection.isConnected()) {
send();
}
} else {
ClientException error = new ClientException("watcher error: %s", message);
registered.completeExceptionally(error);
log.warn("got error for watcher: {}", message);
opts.getErrorHandler().accept(key, new ClientException("watcher error: %s", message));
opts.getErrorHandler().accept(key, error);
}

return false;
}

@Override
public void kill(Throwable exc) {}
public void kill(Throwable exc) {
registered.completeExceptionally(
exc != null ? exc : new ClientException("watcher '%s' killed before registration", key));
}

/** Completes when the server acknowledges the watch */
public CompletableFuture<Void> registered() {
return registered;
}

private void onSendComplete(Void r, Throwable exc) {
if (exc != null) {
log.warn("could not send IPROTO_WATCH packet: %s", exc);
log.warn("could not send IPROTO_WATCH packet", exc);
registered.completeExceptionally(exc);
} else {
log.debug("IPROTO_WATCH sent");
}
}

private void send() {
CompletableFuture<Void> future = connection.send(request).whenComplete(this::onSendComplete);
CompletableFuture<Void> future = connection.send(request);
Timeout timer =
timerService.newTimeout(
timeoutHandler -> {
Expand All @@ -116,6 +130,10 @@ private void send() {
},
opts.getSendTimeout(),
TimeUnit.MILLISECONDS);
future.whenComplete((r, exc) -> timer.cancel());
future.whenComplete(
(r, exc) -> {
timer.cancel();
onSendComplete(r, exc);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.CompletionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -116,14 +117,26 @@ public void testWatchOnce() throws Exception {
testWatchOnceOnContainer(tarantoolContainer, System.getenv("TARANTOOL_VERSION"));
}

@Test
@Timeout(5)
public void testAwaitConnectionReady() throws Exception {
IProtoClient client = getClientAndConnect(tarantoolContainer);

// must not hang; @Timeout guards a missed completion
client.awaitConnectionReady().join();
assertTrue(client.getServerProtocolVersion() >= 0);

client.close();
}

private void testWatchOnceOnContainer(TarantoolContainer tt, String version) throws Exception {
IProtoClient client = getClientAndConnect(tt);
Integer serverVersion = client.getServerProtocolVersion();
if (serverVersion < 6) {
CompletionException ex =
assertThrows(CompletionException.class, () -> client.watchOnce("key1").join());
Throwable cause = ex.getCause();
assertTrue(cause instanceof BoxError);
assertInstanceOf(BoxError.class, cause);
assertTrue(cause.getMessage().contains("Unknown request type 77"));
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright (c) 2026 VK DIGITAL TECHNOLOGIES LIMITED LIABILITY COMPANY
* All Rights Reserved.
*/

package io.tarantool.core.protocol.fsm;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.msgpack.value.Value;
import org.msgpack.value.ValueFactory;

import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_ERROR_BASE;
import static io.tarantool.core.protocol.requests.IProtoConstant.IPROTO_OK;
import static io.tarantool.core.protocol.requests.IProtoConstant.MP_IPROTO_REQUEST_TYPE;
import io.tarantool.core.WatcherOptions;
import io.tarantool.core.connection.Connection;
import io.tarantool.core.connection.ConnectionCloseEvent;
import io.tarantool.core.connection.Greeting;
import io.tarantool.core.protocol.IProtoMessage;
import io.tarantool.core.protocol.IProtoRawResponse;
import io.tarantool.core.protocol.IProtoResponse;

@Timeout(5)
class WatcherStateMachineTest {

private final Timer timer = new HashedWheelTimer();

@AfterEach
void tearDown() {
timer.stop();
}

private WatcherStateMachine newWatcher(Connection connection) {
return new WatcherStateMachine(
"box.shutdown", 1L, response -> {}, connection, WatcherOptions.builder().build(), timer);
}

private static IProtoResponse response(boolean error) {
Map<Value, Value> header = new HashMap<>();
header.put(
MP_IPROTO_REQUEST_TYPE, ValueFactory.newInteger(error ? IPROTO_ERROR_BASE : IPROTO_OK));
return new IProtoRawResponse(ValueFactory.newMap(header), new byte[0], 0);
}

@Test
void registeredCompletesOnFirstEvent() {
WatcherStateMachine watcher = newWatcher(new FakeConnection());
watcher.runOnce();
assertFalse(watcher.registered().isDone());

watcher.process(response(false));

assertTrue(watcher.registered().isDone());
assertFalse(watcher.registered().isCompletedExceptionally());
}

@Test
void registeredFailsOnErrorEvent() {
WatcherStateMachine watcher = newWatcher(new FakeConnection());
watcher.runOnce();

watcher.process(response(true));

assertTrue(watcher.registered().isCompletedExceptionally());
}

@Test
void registeredFailsOnKill() {
WatcherStateMachine watcher = newWatcher(new FakeConnection());
watcher.runOnce();

watcher.kill(new RuntimeException("connection closed"));

assertTrue(watcher.registered().isCompletedExceptionally());
}

@Test
void registeredFailsWhenSendFails() {
FakeConnection connection = new FakeConnection();
CompletableFuture<Void> failed = new CompletableFuture<>();
failed.completeExceptionally(new RuntimeException("send failed"));
connection.sendResult = failed;
WatcherStateMachine watcher = newWatcher(connection);

watcher.runOnce();

assertTrue(watcher.registered().isCompletedExceptionally());
}

@Test
void registeredFailsWhenSendTimesOut() {
FakeConnection connection = new FakeConnection();
connection.sendResult = new CompletableFuture<>();
WatcherStateMachine watcher =
new WatcherStateMachine(
"box.shutdown",
1L,
response -> {},
connection,
WatcherOptions.builder().withSendTimeout(50).build(),
timer);

watcher.runOnce();

assertThrows(Exception.class, () -> watcher.registered().get(2, TimeUnit.SECONDS));
assertTrue(watcher.registered().isCompletedExceptionally());
}

private static final class FakeConnection implements Connection {
private CompletableFuture<Void> sendResult = CompletableFuture.completedFuture(null);

@Override
public CompletableFuture<Void> send(IProtoMessage message) {
return sendResult;
}

@Override
public boolean isConnected() {
return true;
}

@Override
public CompletableFuture<Greeting> connect(InetSocketAddress addr, long timeoutMs) {
return new CompletableFuture<>();
}

@Override
public Connection listen(Consumer<IProtoResponse> listener) {
return this;
}

@Override
public Connection onClose(
ConnectionCloseEvent event, BiConsumer<Connection, Throwable> handler) {
return this;
}

@Override
public Optional<Greeting> getGreeting() {
return Optional.empty();
}

@Override
public void shutdownClose() {}

@Override
public void pause() {}

@Override
public void resume() {}

@Override
public void setIdleTimeout(int idleTimeout) {}

@Override
public boolean isPaused() {
return false;
}

@Override
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ private CompletableFuture<IProtoClient> internalConnect() {
}
return client.ping(firstPingOpts);
})
.thenCompose(r -> client.awaitConnectionReady())
.thenApply(r -> client)
.whenComplete(this::onConnectComplete);
return connectFuture;
Expand Down