Conversation
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Fixed
Show fixed
Hide fixed
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Fixed
Show fixed
Hide fixed
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Fixed
Show fixed
Hide fixed
openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/AppRunLogAppender.java
Outdated
Show resolved
Hide resolved
...metadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/RunLogBuffer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR adds per-run log capture and UI viewing/downloading for internal Applications (notably SearchIndex/reindex), and refines SearchIndexExecutor stats reporting.
Changes:
- Backend: Introduces a Logback appender + buffer to capture app-run logs to disk and exposes new AppResource endpoints to fetch/download/stream logs.
- UI: Adds a new “Logs” tab for internal apps and a log viewer that supports run selection, server selection, SSE streaming for active runs, and downloads.
- SearchIndex: Improves progress/stat consistency by periodically syncing sink stats and ensuring totals don’t drift below observed success+failed counts.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-ui/src/main/resources/ui/src/rest/applicationAPI.ts | Adds REST helpers to fetch and download app-run text logs. |
| openmetadata-ui/src/main/resources/ui/src/constants/constants.ts | Adds a socket event constant for app-run logs (currently unused). |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/MarketPlaceAppDetails/MarketPlaceAppDetails.interface.ts | Adds LOGS tab enum value. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunTextLogs/AppRunTextLogs.interface.ts | Defines props/response typings for the new log viewer. |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppRunTextLogs/AppRunTextLogs.component.tsx | Implements the Logs UI (run/server selection, SSE stream, download). |
| openmetadata-ui/src/main/resources/ui/src/components/Settings/Applications/AppDetails/AppDetails.component.tsx | Adds “Logs” tab for internal, scheduled apps. |
| openmetadata-service/src/main/resources/logback.xml | Registers new APP_RUN_LOG appender on root logger. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/RunLogBuffer.java | Adds buffering + periodic flushing + stream listener support. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/AppRunLogAppender.java | Implements Logback appender and log retention/listing utilities. |
| openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java | Adds endpoints to get/download/list/stream app-run logs. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java | Starts/stops log capture for app runs via MDC + thread prefix matching. |
| openmetadata-service/src/main/java/org/openmetadata/service/socket/WebSocketManager.java | Adds an app-run logs channel constant (currently unused). |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java | Periodic sink stat syncing + total record consistency adjustments. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/logging/RunLogBufferTest.java | Adds unit tests for buffering/flushing/line caps. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/logging/AppRunLogAppenderTest.java | Adds unit tests for appender behaviors (server listing, retention, concurrency). |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexStatsTest.java | Adds tests for new stats consistency behaviors. |
...metadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java
Show resolved
Hide resolved
...esources/ui/src/components/Settings/Applications/AppRunTextLogs/AppRunTextLogs.component.tsx
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/AppRunLogAppender.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/AppRunLogAppender.java
Outdated
Show resolved
Hide resolved
...esources/ui/src/components/Settings/Applications/AppRunTextLogs/AppRunTextLogs.component.tsx
Outdated
Show resolved
Hide resolved
openmetadata-ui/src/main/resources/ui/src/constants/constants.ts
Outdated
Show resolved
Hide resolved
...adata-service/src/test/java/org/openmetadata/service/apps/logging/AppRunLogAppenderTest.java
Show resolved
Hide resolved
...metadata-service/src/main/java/org/openmetadata/service/apps/logging/S3AppRunLogStorage.java
Show resolved
Hide resolved
...metadata-service/src/main/java/org/openmetadata/service/apps/logging/S3AppRunLogStorage.java
Outdated
Show resolved
Hide resolved
...metadata-service/src/main/java/org/openmetadata/service/apps/logging/S3AppRunLogStorage.java
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java
Show resolved
Hide resolved
| public boolean exists(String appName, long runTimestamp, String serverId) { | ||
| String key = s3Key(appName, runTimestamp, serverId); | ||
| try { | ||
| s3Client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(key).build()); | ||
| return true; | ||
| } catch (NoSuchKeyException e) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
exists() only catches NoSuchKeyException, but AWS SDK v2 headObject commonly throws S3Exception with status code 404 for missing keys (depending on client configuration/endpoints). As written, missing objects may surface as uncaught exceptions instead of returning false. Consider catching S3Exception and treating 404/NoSuchKey as non-existent, while rethrowing other errors.
| @Override | ||
| public void flush() throws IOException { | ||
| if (buffer.size() == 0) { | ||
| return; | ||
| } | ||
| byte[] newContent = buffer.toByteArray(); | ||
| buffer.reset(); | ||
|
|
||
| byte[] existing = new byte[0]; | ||
| try { | ||
| existing = | ||
| client | ||
| .getObject(GetObjectRequest.builder().bucket(bucket).key(key).build()) | ||
| .readAllBytes(); | ||
| } catch (NoSuchKeyException e) { | ||
| // first write | ||
| } | ||
|
|
||
| byte[] combined = new byte[existing.length + newContent.length]; | ||
| System.arraycopy(existing, 0, combined, 0, existing.length); | ||
| System.arraycopy(newContent, 0, combined, existing.length, newContent.length); | ||
|
|
||
| client.putObject( | ||
| PutObjectRequest.builder().bucket(bucket).key(key).build(), | ||
| RequestBody.fromBytes(combined)); | ||
| } |
There was a problem hiding this comment.
S3AppendOutputStream.flush() reads the entire existing object and rewrites it on every flush to simulate append. For longer runs this becomes increasingly expensive (O(n²) bytes transferred over time) and can significantly increase S3 costs/latency. Consider switching to chunked/object-per-flush storage (then compose on read), multipart uploads, or buffering larger batches and reducing flush frequency to limit read/rewrites.
| try { | ||
| while (!Thread.currentThread().isInterrupted()) { | ||
| Thread.sleep(5000); | ||
| output.write(": heartbeat\n\n".getBytes()); | ||
| output.flush(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| activeBuffer.removeStreamListener(listener); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
The SSE streaming loop for active runs only exits on thread interruption. When a run completes and AppRunLogAppender.stopCapture(...) removes/closes the buffer, this handler will keep sending heartbeats indefinitely and never emits the done event, which can leak request threads and connections. Add a termination condition inside the loop (e.g., break when AppRunLogAppender.getBuffer(name, String.valueOf(runTimestamp)) is null/closed) and send event: done before returning.
| @Parameter(description = "Server ID filter", schema = @Schema(type = "string")) | ||
| @QueryParam("serverId") | ||
| String serverId) { | ||
| repository.getByName(uriInfo, name, repository.getFields("id")); |
There was a problem hiding this comment.
These new log endpoints validate the app exists via repository.getByName(...), but they don’t perform any authorization check (and securityContext is otherwise unused). To avoid exposing app run logs to callers without view permission, route through getByNameInternal(...) / getInternal(...) or explicitly call authorizer.authorize(...) with the appropriate VIEW operation before reading/streaming logs.
| repository.getByName(uriInfo, name, repository.getFields("id")); | |
| App app = repository.getByName(uriInfo, name, repository.getFields("id")); | |
| authorizer.authorize( | |
| getSubjectContext(securityContext), | |
| new OperationContext(APPLICATION, MetadataOperation.VIEW_BASIC), | |
| app.getEntityReference()); |
| static final GenericContainer<?> minio = | ||
| new GenericContainer<>("minio/minio:latest") | ||
| .withCommand("server /data") | ||
| .withExposedPorts(9000) | ||
| .withEnv("MINIO_ROOT_USER", MINIO_ACCESS_KEY) | ||
| .withEnv("MINIO_ROOT_PASSWORD", MINIO_SECRET_KEY) | ||
| .waitingFor( | ||
| new HttpWaitStrategy() | ||
| .forPath("/minio/health/ready") | ||
| .forPort(9000) | ||
| .withStartupTimeout(Duration.ofMinutes(1))); | ||
|
|
There was a problem hiding this comment.
The MinIO Testcontainers image is pinned to latest, which makes the test suite non-deterministic and can break unexpectedly when MinIO releases new versions. Pin to a specific MinIO image tag (and optionally document/centralize the version) for reproducible CI runs.
| responseCode == 400 || responseCode == 500, | ||
| "Path traversal should be rejected, got: " + responseCode); |
There was a problem hiding this comment.
This test currently treats a 500 response as acceptable for a path traversal attempt. Since the API should reject invalid serverId with a deterministic 4xx (and the resource code throws BadRequestException for invalid server IDs), tighten the assertion to expect 400 so the test will catch regressions that accidentally turn input validation failures into 500s.
| responseCode == 400 || responseCode == 500, | |
| "Path traversal should be rejected, got: " + responseCode); | |
| responseCode == 400, | |
| "Invalid serverId should return 400, got: " + responseCode); |
| <pre | ||
| data-testid="lazy-log" | ||
| ref={logContainerRef} | ||
| style={{ | ||
| height: '60vh', | ||
| overflow: 'auto', | ||
| margin: 0, | ||
| padding: '12px', | ||
| backgroundColor: '#222', | ||
| color: '#fff', | ||
| fontSize: '12px', | ||
| fontFamily: '"Monaco", "Menlo", "Consolas", monospace', | ||
| lineHeight: 1.6, | ||
| borderRadius: '4px', | ||
| whiteSpace: 'pre', | ||
| tabSize: 4, | ||
| }}> |
There was a problem hiding this comment.
This component renders logs with a raw
and hardcoded inline styles (including hardcoded colors). The UI already has a log viewer implementation (@melloware/react-logviewer/LazyLog) and shared styling (e.g.,lazy-log-container) used inAppLogsViewer, which also provides search, selectable lines, and consistent theming. Consider reusing the existing log viewer component/styles and moving any styling to the existing LESS/theme tokens instead of inline hex colors.
| void localStorageRejectsPathTraversal() { | ||
| LocalAppRunLogStorage storage = new LocalAppRunLogStorage(tempDir.toString()); | ||
| try { | ||
| storage.readLogs("../../etc", 1L, "passwd"); | ||
| // Should not reach here | ||
| assertFalse(true, "Expected IllegalArgumentException"); | ||
| } catch (IllegalArgumentException e) { | ||
| assertTrue(e.getMessage().contains("Invalid path")); | ||
| } |
There was a problem hiding this comment.
This path traversal test uses a manual try/catch with assertFalse(true, ...) to indicate failure. Use assertThrows(IllegalArgumentException.class, ...) (and then assert on the message) to make the intent clearer and ensure the test fails correctly if the exception is not thrown.
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
| @Parameter(description = "Run timestamp", schema = @Schema(type = "number")) | ||
| @PathParam("runTimestamp") | ||
| Long runTimestamp) { | ||
| repository.getByName(uriInfo, name, repository.getFields("id")); |
There was a problem hiding this comment.
getAppRunLogServers returns server IDs for a run without any authorization check. Even if the content is just metadata, it can leak operational details; please authorize VIEW access to the App before returning this information.
| repository.getByName(uriInfo, name, repository.getFields("id")); | |
| App app = repository.getByName(uriInfo, name, repository.getFields("id")); | |
| SubjectContext subjectContext = getSubjectContext(securityContext); | |
| OperationContext operationContext = | |
| new OperationContext(APPLICATION, MetadataOperation.VIEW_ALL); | |
| authorizer.authorize(subjectContext, operationContext, app.getEntityReference()); |
| long now = System.currentTimeMillis(); | ||
| if (now - lastSinkSyncTime >= SINK_SYNC_INTERVAL_MS) { | ||
| lastSinkSyncTime = now; | ||
| syncSinkStatsFromBulkSink(); | ||
| } |
There was a problem hiding this comment.
periodicSyncSinkStats is invoked from multiple consumer threads, but the now - lastSinkSyncTime check and the lastSinkSyncTime = now update are not atomic. Under concurrency, multiple threads can pass the check and trigger syncSinkStatsFromBulkSink() far more often than intended. Use an AtomicLong with CAS (or a synchronized/locked section) to enforce the interval reliably.
|
|
||
| @Container | ||
| static final GenericContainer<?> minio = | ||
| new GenericContainer<>("minio/minio:latest") |
There was a problem hiding this comment.
The MinIO Testcontainers image is pinned to latest, which makes tests non-reproducible and can introduce sudden breakages when the upstream image changes. Pin this to a specific MinIO release tag (and update intentionally when needed).
| new GenericContainer<>("minio/minio:latest") | |
| new GenericContainer<>("minio/minio:RELEASE.2024-01-18T21-02-27Z") |
| @QueryParam("serverId") | ||
| String serverId) { | ||
| repository.getByName(uriInfo, name, repository.getFields("id")); | ||
|
|
There was a problem hiding this comment.
streamAppRunTextLogs streams potentially sensitive log content but does not authorize the request (it only validates the app exists). Please add an explicit authorization check for VIEW access to the App before opening the SSE stream.
| // Authorize VIEW access on Apps before streaming potentially sensitive logs | |
| authorizer.authorize( | |
| securityContext, new OperationContext(APPLICATION, MetadataOperation.VIEW)); |
| java.util.function.Consumer<String> listener = | ||
| batchText -> { | ||
| try { | ||
| for (String logLine : batchText.split("\n")) { | ||
| output.write( |
There was a problem hiding this comment.
The SSE listener writes to the same output stream that the request thread is also writing to (heartbeats / done events). Because RunLogBuffer notifies listeners from its scheduled flusher thread, this results in concurrent writes to output, which is not thread-safe and can interleave/corrupt the SSE stream or throw sporadic IO errors. Consider funneling all writes through a single thread (e.g., listener enqueues lines into a BlockingQueue that the SSE loop drains) or synchronizing all output writes on a shared lock.
| @Parameter(description = "Server ID filter", schema = @Schema(type = "string")) | ||
| @QueryParam("serverId") | ||
| String serverId) { | ||
| repository.getByName(uriInfo, name, repository.getFields("id")); |
There was a problem hiding this comment.
downloadAppRunTextLogs also bypasses the usual authorizer.authorize(...) checks before streaming log content. Please add an explicit authorization check for VIEW access on the App before allowing download.
| repository.getByName(uriInfo, name, repository.getFields("id")); | |
| App app = repository.getByName(uriInfo, name, repository.getFields("id")); | |
| SubjectContext subjectContext = getSubjectContext(securityContext); | |
| OperationContext operationContext = | |
| new OperationContext(APPLICATION, MetadataOperation.VIEW_BASIC); | |
| authorizer.authorize(subjectContext, operationContext, getResourceContext(app)); |
|
|
|
closing this |
| static String formatLine(ILoggingEvent event) { | ||
| String timestamp = FORMATTER.format(Instant.ofEpochMilli(event.getTimeStamp())); | ||
| return String.format( | ||
| "%s [%s] %-5s %s - %s", | ||
| timestamp, | ||
| event.getThreadName(), | ||
| event.getLevel(), | ||
| event.getLoggerName(), | ||
| event.getFormattedMessage()); | ||
| } |
There was a problem hiding this comment.
⚠️ Bug: formatLine drops exception stack traces from captured logs
The formatLine method only uses event.getFormattedMessage() and completely ignores event.getThrowableProxy(). In Logback, exception stack traces are stored separately in the throwable proxy, not in the formatted message. This means all LOG.error("...", exception) calls will have their stack traces silently dropped from the captured app run logs, making it very difficult to debug failed runs — which is the primary use case for this feature.
Suggested fix:
static String formatLine(ILoggingEvent event) {
String timestamp = FORMATTER.format(Instant.ofEpochMilli(event.getTimeStamp()));
StringBuilder sb = new StringBuilder();
sb.append(String.format("%s [%s] %-5s %s - %s",
timestamp, event.getThreadName(), event.getLevel(),
event.getLoggerName(), event.getFormattedMessage()));
if (event.getThrowableProxy() != null) {
sb.append("
");
sb.append(ch.qos.logback.classic.pattern.ThrowableProxyConverter
.throwableProxyToString(event.getThrowableProxy()));
}
return sb.toString();
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| if (resolvedServerId != null | ||
| && storage.exists(name, runTimestamp, resolvedServerId)) { | ||
| String content = storage.readLogs(name, runTimestamp, resolvedServerId); | ||
| for (String line : content.split("\n")) { | ||
| output.write( | ||
| ("data: " + line + "\n\n") | ||
| .getBytes(java.nio.charset.StandardCharsets.UTF_8)); | ||
| } | ||
| output.flush(); | ||
| } |
There was a problem hiding this comment.
⚠️ Performance: SSE stream endpoint reads entire log file into memory String
In streamAppRunTextLogs, storage.readLogs() at line 759 reads the entire log file into a single in-memory String, then splits it by newline to write line-by-line as SSE events. With maxLinesPerRun defaulting to 100,000 lines, this could easily be tens of MB per request. The readLogsStream() method exists specifically for streaming, and is already used correctly in the download endpoint.
Suggested fix:
// Replace readLogs + split with streaming read:
if (resolvedServerId != null
&& storage.exists(name, runTimestamp, resolvedServerId)) {
try (var reader = new java.io.BufferedReader(
new java.io.InputStreamReader(
storage.readLogsStream(name, runTimestamp, resolvedServerId),
java.nio.charset.StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.write(("data: " + line + "
")
.getBytes(java.nio.charset.StandardCharsets.UTF_8));
}
}
output.flush();
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
| private void notifyListeners(String batchText) { | ||
| for (Consumer<String> listener : streamListeners) { | ||
| try { | ||
| listener.accept(batchText); | ||
| } catch (Exception e) { | ||
| LOG.debug("Stream listener error, removing: {}", e.getMessage()); | ||
| streamListeners.remove(listener); | ||
| } | ||
| } |
There was a problem hiding this comment.
💡 Edge Case: SSE listener RuntimeException on disconnect swallowed silently
In the SSE streaming endpoint, when a client disconnects, the listener's output.write() throws IOException which is wrapped in RuntimeException (line 795). This propagates to RunLogBuffer.notifyListeners() which catches it and removes the listener (line 161-163). However, the exception message logged at DEBUG level says "Stream listener error, removing" — this is expected behavior on disconnect, not an error. More importantly, if a listener throws during notifyListeners, it breaks the loop and subsequent listeners for the same batch won't be notified.
Suggested fix:
// In RunLogBuffer.notifyListeners, catch per-listener
// to avoid breaking the loop:
private void notifyListeners(String batchText) {
List<Consumer<String>> toRemove = new ArrayList<>();
for (Consumer<String> listener : streamListeners) {
try {
listener.accept(batchText);
} catch (Exception e) {
LOG.debug("Removing disconnected stream listener");
toRemove.add(listener);
}
}
streamListeners.removeAll(toRemove);
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion



Describe your changes:
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
AppRunLogAppender(Logback appender) to capture logs during app execution via MDC or thread name matchingRunLogBufferwith scheduled flushing and stream listener support for real-time log deliveryLocalAppRunLogStorage(filesystem) andS3AppRunLogStoragewith buffered uploadsGET /v1/apps/name/{name}/runs/{runTimestamp}/logs— fetch text logs with server filteringGET /v1/apps/name/{name}/runs/{runTimestamp}/logs/download— download as.logfileGET /v1/apps/name/{name}/runs/{runTimestamp}/logs/stream— Server-Sent Events stream for live/archived logsGET /v1/apps/name/{name}/runs/{runTimestamp}/logs/servers— list servers with logs for a runAppRunTextLogsUI component with run/server selection, live streaming, download, and copy-to-clipboardThis will update automatically on new commits.