diff --git a/CHANGELOG.md b/CHANGELOG.md index d9ab269a5..6f34fb6ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## Pending +- fix: use OkHttp's public SSE factory for Horizon streams and force SSE requests to bypass caches. - feat: sort `ScMap` entries by key in `Scv.toMap` following Soroban runtime ordering rules, as the network requires ScMap keys to be in ascending order. `Scv.toMap` now accepts `Map`; the previous `toMap(LinkedHashMap)` overload is deprecated. ([#766](https://github.com/lightsail-network/java-stellar-sdk/pull/766)) - feat: add SEP-0051 support. ([#776](https://github.com/lightsail-network/java-stellar-sdk/pull/776)) - feat: add `closeTime`, `headerXdr`, and `metadataXdr` to `GetLatestLedgerResponse`. ([#768](https://github.com/lightsail-network/java-stellar-sdk/pull/768)) diff --git a/src/main/java/org/stellar/sdk/requests/SSEStream.java b/src/main/java/org/stellar/sdk/requests/SSEStream.java index 2789f70bd..232b7eb80 100644 --- a/src/main/java/org/stellar/sdk/requests/SSEStream.java +++ b/src/main/java/org/stellar/sdk/requests/SSEStream.java @@ -9,13 +9,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import okhttp3.CacheControl; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import okhttp3.internal.sse.RealEventSource; import okhttp3.sse.EventSource; import okhttp3.sse.EventSourceListener; +import okhttp3.sse.EventSources; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.stellar.sdk.Util; @@ -77,6 +78,9 @@ private void start() { executorService.scheduleWithFixedDelay( () -> { + // This timeout is driven by EventSourceListener#onEvent. With the public OkHttp SSE API + // that means only parsed SSE data events refresh liveness; comment-only keepalive frames + // are not surfaced here and may still lead to reconnects if no business events arrive. if (System.currentTimeMillis() - latestEventTime.get() > reconnectTimeout) { latestEventTime.set(System.currentTimeMillis()); isClosed.compareAndSet(false, true); @@ -168,19 +172,17 @@ private static EventSource doStre Request.Builder builder = new Request.Builder() .url(addIdentificationQueryParameter(url)) - .header("Accept", "text/event-stream"); + .cacheControl(CacheControl.FORCE_NETWORK); String lastEventId = stream.lastEventId.get(); if (lastEventId != null) { builder.header("Last-Event-ID", lastEventId); } Request request = builder.build(); - RealEventSource eventSource = - new RealEventSource( + return EventSources.createFactory(okHttpClient) + .newEventSource( request, new StellarEventSourceListener<>( stream, closeListener, responseClass, requestBuilder, listener, listenerId)); - eventSource.connect(okHttpClient); - return eventSource; } private interface CloseListener { @@ -257,7 +259,8 @@ public void onEvent( if (stream.isStopped.get() || listenerId != stream.currentListenerId.get()) { return; } - // Update the timestamp of the last received event. + // Treat actual SSE data events as activity. Comment frames are handled internally by OkHttp + // and do not reach this callback, so they do not extend the reconnect timeout. stream.latestEventTime.set(System.currentTimeMillis()); if (data.equals("\"hello\"") || data.equals("\"byebye\"")) { diff --git a/src/test/kotlin/org/stellar/sdk/requests/SSEStreamTest.kt b/src/test/kotlin/org/stellar/sdk/requests/SSEStreamTest.kt new file mode 100644 index 000000000..2c5a9af00 --- /dev/null +++ b/src/test/kotlin/org/stellar/sdk/requests/SSEStreamTest.kt @@ -0,0 +1,247 @@ +package org.stellar.sdk.requests + +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldBeEmpty +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import io.kotest.matchers.string.shouldContain +import io.kotest.matchers.types.shouldBeInstanceOf +import java.io.Closeable +import java.util.Optional +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import okhttp3.mockwebserver.RecordedRequest +import okhttp3.mockwebserver.SocketPolicy +import org.stellar.sdk.Server +import org.stellar.sdk.responses.LedgerResponse + +class SSEStreamTest : + FunSpec({ + context("successful streaming") { + test("parses a ledger event and ignores hello / byebye data frames") { + withLedgerStream( + sseResponse( + controlMessage("hello"), + ledgerEvent(id = "event-1", cursor = "cursor-1", sequence = 1L), + controlMessage("byebye"), + ), + noResponse(), + ) { + takeRequest().shouldBeSseRequest() + + val event = probe.awaitEvent() + event.pagingToken shouldBe "cursor-1" + event.sequence shouldBe 1L + stream.lastPagingToken() shouldBe "event-1" + } + } + + test("ignores SSE comment frames") { + withLedgerStream( + sseResponse( + comment("keepalive"), + comment("another comment line"), + ledgerEvent(id = "event-2", cursor = "cursor-2", sequence = 2L), + ), + noResponse(), + ) { + takeRequest().shouldBeSseRequest() + + val event = probe.awaitEvent() + event.pagingToken shouldBe "cursor-2" + event.sequence shouldBe 2L + stream.lastPagingToken() shouldBe "event-2" + } + } + } + + context("resume behavior") { + test("reconnects with Last-Event-ID and cursor, stops issuing requests after close") { + withLedgerStream( + sseResponse(ledgerEvent(id = "event-1", cursor = "cursor-1", sequence = 1L)), + noResponse(), + reconnectTimeout = SHORT_RECONNECT_MS, + ) { + takeRequest().shouldBeSseRequest() + probe.awaitEvent() + + takeRequest().shouldBeSseRequest(lastEventId = "event-1", cursor = "cursor-1") + + stream.close() + expectNoAdditionalRequests(NO_REQUEST_WINDOW_MS) + } + } + + test("advances cursor to the latest event when one connection delivers multiple events") { + withLedgerStream( + sseResponse( + ledgerEvent(id = "event-1", cursor = "cursor-1", sequence = 1L), + ledgerEvent(id = "event-2", cursor = "cursor-2", sequence = 2L), + ), + noResponse(), + reconnectTimeout = SHORT_RECONNECT_MS, + ) { + takeRequest().shouldBeSseRequest() + + val events = probe.awaitEvents(count = 2) + events.last().sequence shouldBe 2L + stream.lastPagingToken() shouldBe "event-2" + + takeRequest().shouldBeSseRequest(lastEventId = "event-2", cursor = "cursor-2") + } + } + } + + context("failure propagation") { + test("fails when the response content type is not SSE") { + withLedgerStream( + MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody("{}") + ) { + takeRequest().shouldBeSseRequest() + + val failure = probe.awaitFailure() + probe.eventsSnapshot().shouldBeEmpty() + + failure.responseCode shouldBe 200 + val error = failure.error.shouldNotBeNull() + error.shouldBeInstanceOf() + error.message.shouldNotBeNull() shouldContain "Invalid content-type" + } + } + + test("fails on malformed JSON and does not advance resume state") { + withLedgerStream( + sseResponse("id: bad-event\ndata: {\"paging_token\":\n\n"), + noResponse(), + reconnectTimeout = SHORT_RECONNECT_MS, + ) { + takeRequest().shouldBeSseRequest() + + val failure = probe.awaitFailure() + probe.eventsSnapshot().shouldBeEmpty() + + failure.error.shouldNotBeNull().shouldBeInstanceOf() + failure.responseCode shouldBe 200 + stream.lastPagingToken().shouldBeNull() + + // The reconnect should happen fresh, with no resume tokens. + takeRequest().shouldBeSseRequest() + } + } + + test("surfaces HTTP error codes as failure signals without a throwable") { + withLedgerStream(MockResponse().setResponseCode(503).setBody("temporarily unavailable")) { + takeRequest().shouldBeSseRequest() + + val failure = probe.awaitFailure() + probe.eventsSnapshot().shouldBeEmpty() + + failure.error.shouldBeNull() + failure.responseCode shouldBe 503 + } + } + } + }) + +private data class FailureSignal(val error: Throwable?, val responseCode: Int?) + +private class StreamProbe : EventListener { + private val events = LinkedBlockingQueue() + private val failures = LinkedBlockingQueue() + + override fun onEvent(event: LedgerResponse) { + events.put(event) + } + + override fun onFailure(error: Optional, responseCode: Optional) { + failures.put(FailureSignal(error.orElse(null), responseCode.orElse(null))) + } + + fun awaitEvent(): LedgerResponse = + events.poll(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS).shouldNotBeNull() + + fun awaitEvents(count: Int): List = List(count) { awaitEvent() } + + fun awaitFailure(): FailureSignal = + failures.poll(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS).shouldNotBeNull() + + /** Drains the non-blocking view of received events, for assertions like "nothing leaked". */ + fun eventsSnapshot(): List = events.toList() +} + +private class StreamFixture( + private val mockWebServer: MockWebServer, + val probe: StreamProbe, + val stream: SSEStream, +) : Closeable { + fun takeRequest(): RecordedRequest = + mockWebServer.takeRequest(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS).shouldNotBeNull() + + fun expectNoAdditionalRequests(windowMs: Long) { + mockWebServer.takeRequest(windowMs, TimeUnit.MILLISECONDS).shouldBeNull() + } + + override fun close() { + stream.close() + } +} + +private inline fun withLedgerStream( + vararg responses: MockResponse, + reconnectTimeout: Long = SSEStream.DEFAULT_RECONNECT_TIMEOUT, + block: StreamFixture.() -> Unit, +) { + MockWebServer().use { mockWebServer -> + responses.forEach(mockWebServer::enqueue) + mockWebServer.start() + + Server(mockWebServer.url("/").toString()).use { server -> + val probe = StreamProbe() + val stream = server.ledgers().stream(probe, reconnectTimeout) + StreamFixture(mockWebServer, probe, stream).use(block) + } + } +} + +private fun sseResponse(body: String): MockResponse = + MockResponse().setResponseCode(200).setHeader("Content-Type", EVENT_STREAM).setBody(body) + +private fun sseResponse(vararg frames: String): MockResponse = + sseResponse(frames.joinToString(separator = "")) + +private fun noResponse(): MockResponse = MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE) + +private fun ledgerEvent(id: String, cursor: String, sequence: Long): String = + "id: $id\ndata: {\"paging_token\":\"$cursor\",\"sequence\":$sequence}\n\n" + +private fun controlMessage(message: String): String = "data: \"$message\"\n\n" + +private fun comment(message: String): String = ": $message\n\n" + +private fun RecordedRequest.shouldBeSseRequest( + lastEventId: String? = null, + cursor: String? = null, +) { + getHeader("Accept") shouldBe EVENT_STREAM + getHeader("Cache-Control") shouldBe "no-cache" + getHeader("Last-Event-ID") shouldBe lastEventId + getHeader("X-Client-Name") shouldBe SDK_NAME + getHeader("X-Client-Version").shouldNotBeNull() + + val url = requestUrl.shouldNotBeNull() + url.queryParameter("X-Client-Name") shouldBe SDK_NAME + url.queryParameter("X-Client-Version").shouldNotBeNull() + url.queryParameter("cursor") shouldBe cursor +} + +private const val EVENT_STREAM = "text/event-stream" +private const val SDK_NAME = "java-stellar-sdk" +private const val AWAIT_TIMEOUT_SECONDS = 5L +private const val SHORT_RECONNECT_MS = 50L +private const val NO_REQUEST_WINDOW_MS = 750L