Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,8 @@ Mono<MetaResponse> meta(MetaRequest request) {
Mono<ReadResponse> read(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}

Mono<ReadResponse> recentLogs(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Mono<ReadResponse> read(ReadRequest request) {
return getReactorLogCacheEndpoints().read(request);
}

@Override
public Mono<ReadResponse> recentLogs(ReadRequest request) {
return getReactorLogCacheEndpoints().recentLogs(request);
}

/**
* The connection context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ public interface DopplerClient {
*/
Flux<Envelope> firehose(FirehoseRequest request);

// TODO Adapt the message
/**
* Makes the <a href="https://github.com/cloudfoundry/loggregator/tree/develop/src/trafficcontroller#endpoints">Recent Logs</a> request
*
* @deprecated Do not use this type directly, it exists only for the <em>Jackson</em>-binding infrastructure
* @param request the Recent Logs request
* @return the events from the recent logs
*/
@Deprecated
Flux<Envelope> recentLogs(RecentLogsRequest request);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,12 @@ public interface LogCacheClient {
* @return the read response
*/
Mono<ReadResponse> read(ReadRequest request);

/**
* Makes the Log Cache RecentLogs /api/v1/read request
*
* @param request the Recent Logs request
* @return the events from the recent logs
*/
Mono<ReadResponse> recentLogs(ReadRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.cloudfoundry.client.v3.spaces.ListSpacesRequest;
import org.cloudfoundry.client.v3.spaces.SpaceResource;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.networking.NetworkingClient;
import org.cloudfoundry.operations.advanced.Advanced;
import org.cloudfoundry.operations.advanced.DefaultAdvanced;
Expand Down Expand Up @@ -79,7 +80,7 @@ public Advanced advanced() {
@Override
@Value.Derived
public Applications applications() {
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getSpaceId());
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getLogCacheClientPublisher(), getSpaceId());
}

@Override
Expand Down Expand Up @@ -197,6 +198,19 @@ Mono<DopplerClient> getDopplerClientPublisher() {
.orElse(Mono.error(new IllegalStateException("DopplerClient must be set")));
}

/**
* The {@link LogCacheClient} to use for operations functionality
*/
@Nullable
abstract LogCacheClient getLogCacheClient();

@Value.Derived
Mono<LogCacheClient> getLogCacheClientPublisher() {
return Optional.ofNullable(getLogCacheClient())
.map(Mono::just)
.orElse(Mono.error(new IllegalStateException("LogCacheClient must be set")));
}

/**
* The {@link NetworkingClient} to use for operations functionality
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.cloudfoundry.operations.applications;

import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.ReadRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -126,6 +128,15 @@ public interface Applications {
@Deprecated
Flux<LogMessage> logs(LogsRequest request);

/**
* List the applications logs from logCacheClient.
* If no messages are available, an empty Flux is returned.
*
* @param request the application logs request
* @return the applications logs
*/
Flux<Log> logsRecent(ReadRequest request);

/**
* List the applications logs.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@
import org.cloudfoundry.doppler.LogMessage;
import org.cloudfoundry.doppler.RecentLogsRequest;
import org.cloudfoundry.doppler.StreamRequest;
import org.cloudfoundry.logcache.v1.EnvelopeBatch;
import org.cloudfoundry.logcache.v1.Log;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.operations.util.OperationsLogging;
import org.cloudfoundry.util.DateUtils;
import org.cloudfoundry.util.DelayTimeoutException;
Expand Down Expand Up @@ -200,6 +204,10 @@ public final class DefaultApplications implements Applications {
private static final Comparator<LogMessage> LOG_MESSAGE_COMPARATOR =
Comparator.comparing(LogMessage::getTimestamp);

private static final Comparator<org.cloudfoundry.logcache.v1.Envelope>
LOG_MESSAGE_COMPARATOR_LOG_CACHE =
Comparator.comparing(org.cloudfoundry.logcache.v1.Envelope::getTimestamp);

private static final Duration LOG_MESSAGE_TIMESPAN = Duration.ofMillis(500);

private static final int MAX_NUMBER_OF_RECENT_EVENTS = 50;
Expand All @@ -214,24 +222,29 @@ public final class DefaultApplications implements Applications {

private final Mono<DopplerClient> dopplerClient;

private final Mono<LogCacheClient> logCacheClient;

private final RandomWords randomWords;

private final Mono<String> spaceId;

public DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<LogCacheClient> logCacheClient,
Mono<String> spaceId) {
this(cloudFoundryClient, dopplerClient, new WordListRandomWords(), spaceId);
this(cloudFoundryClient, dopplerClient, logCacheClient, new WordListRandomWords(), spaceId);
}

DefaultApplications(
Mono<CloudFoundryClient> cloudFoundryClient,
Mono<DopplerClient> dopplerClient,
Mono<LogCacheClient> logCacheClient,
RandomWords randomWords,
Mono<String> spaceId) {
this.cloudFoundryClient = cloudFoundryClient;
this.dopplerClient = dopplerClient;
this.logCacheClient = logCacheClient;
this.randomWords = randomWords;
this.spaceId = spaceId;
}
Expand Down Expand Up @@ -529,6 +542,7 @@ public Flux<Task> listTasks(ListApplicationTasksRequest request) {
.checkpoint();
}

@Deprecated
@Override
public Flux<LogMessage> logs(LogsRequest request) {
return Mono.zip(this.cloudFoundryClient, this.spaceId)
Expand All @@ -544,6 +558,13 @@ public Flux<LogMessage> logs(LogsRequest request) {
.checkpoint();
}

@Override
public Flux<Log> logsRecent(ReadRequest request) {
return getRecentLogsLogCache(this.logCacheClient, request)
.transform(OperationsLogging.log("Get Application Logs"))
.checkpoint();
}

@Override
public Flux<ApplicationLog> logs(ApplicationLogsRequest request) {
return logs(LogsRequest.builder()
Expand Down Expand Up @@ -673,7 +694,6 @@ public Mono<Void> pushManifestV3(PushManifestV3Request request) {
} catch (IOException e) {
throw new RuntimeException("Could not serialize manifest", e);
}

return Mono.zip(this.cloudFoundryClient, this.spaceId)
.flatMap(
function(
Expand Down Expand Up @@ -1617,6 +1637,17 @@ private static Flux<LogMessage> getLogs(
}
}

private static Flux<Log> getRecentLogsLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return requestLogsRecentLogCache(logCacheClient, readRequest)
.map(EnvelopeBatch::getBatch)
.map(List::stream)
.flatMapIterable(envelopeStream -> envelopeStream.collect(Collectors.toList()))
.filter(e -> e.getLog() != null)
.sort(LOG_MESSAGE_COMPARATOR_LOG_CACHE)
.map(org.cloudfoundry.logcache.v1.Envelope::getLog);
}

@SuppressWarnings("unchecked")
private static Map<String, Object> getMetadataRequest(EventEntity entity) {
Map<String, Optional<Object>> metadata =
Expand Down Expand Up @@ -2501,6 +2532,7 @@ private static Flux<TaskResource> requestListTasks(
.build()));
}

@Deprecated
private static Flux<Envelope> requestLogsRecent(
Mono<DopplerClient> dopplerClient, String applicationId) {
return dopplerClient.flatMapMany(
Expand All @@ -2509,6 +2541,14 @@ private static Flux<Envelope> requestLogsRecent(
RecentLogsRequest.builder().applicationId(applicationId).build()));
}

private static Mono<EnvelopeBatch> requestLogsRecentLogCache(
Mono<LogCacheClient> logCacheClient, ReadRequest readRequest) {
return logCacheClient.flatMap(
client ->
client.recentLogs(readRequest)
.flatMap(response -> Mono.justOrEmpty(response.getEnvelopes())));
}

private static Flux<Envelope> requestLogsStream(
Mono<DopplerClient> dopplerClient, String applicationId) {
return dopplerClient.flatMapMany(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.cloudfoundry.client.v3.stacks.StacksV3;
import org.cloudfoundry.client.v3.tasks.Tasks;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.routing.RoutingClient;
import org.cloudfoundry.routing.v1.routergroups.RouterGroups;
import org.cloudfoundry.uaa.UaaClient;
Expand Down Expand Up @@ -104,6 +105,8 @@ public abstract class AbstractOperationsTest {

protected final DopplerClient dopplerClient = mock(DopplerClient.class, RETURNS_SMART_NULLS);

protected final LogCacheClient logCacheClient = mock(LogCacheClient.class, RETURNS_SMART_NULLS);

protected final Events events = mock(Events.class, RETURNS_SMART_NULLS);

protected final FeatureFlags featureFlags = mock(FeatureFlags.class, RETURNS_SMART_NULLS);
Expand Down
Loading