diff --git a/internal/devserver/server.go b/internal/devserver/server.go index f91ae1f77..610bd628a 100644 --- a/internal/devserver/server.go +++ b/internal/devserver/server.go @@ -39,6 +39,7 @@ import ( uiconfig "github.com/temporalio/ui-server/v2/server/config" uiserveroptions "github.com/temporalio/ui-server/v2/server/server_options" "go.temporal.io/api/enums/v1" + "go.temporal.io/server/chasm/lib/activity" "go.temporal.io/server/common/authorization" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" @@ -240,6 +241,11 @@ func (s *StartOptions) buildServerOptions() ([]temporal.ServerOption, *slog.Leve // Up default visibility RPS dynConf[dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance.Key()] = 100 + // Enable CHASM and SAA. These will be on by default in server v1.32, at which point these lines + // should be removed. + dynConf[dynamicconfig.EnableChasm.Key()] = true + dynConf[activity.Enabled.Key()] = true + // Dynamic config if set for k, v := range s.DynamicConfigValues { dynConf[dynamicconfig.MakeKey(k)] = v diff --git a/internal/temporalcli/commands.activity.go b/internal/temporalcli/commands.activity.go index 0ab6cf305..582f5d622 100644 --- a/internal/temporalcli/commands.activity.go +++ b/internal/temporalcli/commands.activity.go @@ -1,16 +1,25 @@ package temporalcli import ( + "context" + "encoding/json" + "errors" "fmt" "time" + "github.com/fatih/color" "github.com/temporalio/cli/internal/printer" activitypb "go.temporal.io/api/activity/v1" "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/failure/v1" + "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/temporal" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -31,6 +40,534 @@ type ( } ) +func (c *TemporalActivityStartCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle, err := startActivity(cctx, cl, &c.ActivityStartOptions, &c.PayloadInputOptions) + if err != nil { + return err + } + return printActivityExecution(cctx, c.ActivityId, handle.GetRunID(), c.Parent.Namespace) +} + +func (c *TemporalActivityExecuteCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle, err := startActivity(cctx, cl, &c.ActivityStartOptions, &c.PayloadInputOptions) + if err != nil { + return err + } + if !cctx.JSONOutput { + if err := printActivityExecution(cctx, c.ActivityId, handle.GetRunID(), c.Parent.Namespace); err != nil { + cctx.Logger.Error("Failed printing execution info", "error", err) + } + } + return getActivityResult(cctx, cl, c.Parent.Namespace, c.ActivityId, handle.GetRunID()) +} + +func (c *TemporalActivityResultCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + return getActivityResult(cctx, cl, c.Parent.Namespace, c.ActivityId, c.RunId) +} + +func startActivity( + cctx *CommandContext, + cl client.Client, + opts *ActivityStartOptions, + inputOpts *PayloadInputOptions, +) (client.ActivityHandle, error) { + startOpts, err := buildStartActivityOptions(opts) + if err != nil { + return nil, err + } + input, err := inputOpts.buildRawInput() + if err != nil { + return nil, err + } + cctx.Context, err = contextWithHeaders(cctx.Context, opts.Headers) + if err != nil { + return nil, err + } + handle, err := cl.ExecuteActivity(cctx, startOpts, opts.Type, input...) + if err != nil { + return nil, fmt.Errorf("failed starting activity: %w", err) + } + return handle, nil +} + +func printActivityExecution(cctx *CommandContext, activityID, runID, namespace string) error { + if !cctx.JSONOutput { + cctx.Printer.Println(color.MagentaString("Running execution:")) + } + return cctx.Printer.PrintStructured(struct { + ActivityId string `json:"activityId"` + RunId string `json:"runId"` + Namespace string `json:"namespace"` + }{ + ActivityId: activityID, + RunId: runID, + Namespace: namespace, + }, printer.StructuredOptions{}) +} + +func buildStartActivityOptions(opts *ActivityStartOptions) (client.StartActivityOptions, error) { + o := client.StartActivityOptions{ + ID: opts.ActivityId, + TaskQueue: opts.TaskQueue, + ScheduleToCloseTimeout: opts.ScheduleToCloseTimeout.Duration(), + ScheduleToStartTimeout: opts.ScheduleToStartTimeout.Duration(), + StartToCloseTimeout: opts.StartToCloseTimeout.Duration(), + HeartbeatTimeout: opts.HeartbeatTimeout.Duration(), + Summary: opts.StaticSummary, + Details: opts.StaticDetails, + Priority: temporal.Priority{ + PriorityKey: opts.PriorityKey, + FairnessKey: opts.FairnessKey, + FairnessWeight: opts.FairnessWeight, + }, + } + if opts.RetryInitialInterval.Duration() > 0 || opts.RetryMaximumInterval.Duration() > 0 || + opts.RetryBackoffCoefficient > 0 || opts.RetryMaximumAttempts > 0 { + o.RetryPolicy = &temporal.RetryPolicy{ + InitialInterval: opts.RetryInitialInterval.Duration(), + MaximumInterval: opts.RetryMaximumInterval.Duration(), + BackoffCoefficient: float64(opts.RetryBackoffCoefficient), + MaximumAttempts: int32(opts.RetryMaximumAttempts), + } + } + if opts.IdReusePolicy.Value != "" { + var err error + o.ActivityIDReusePolicy, err = stringToProtoEnum[enumspb.ActivityIdReusePolicy]( + opts.IdReusePolicy.Value, enumspb.ActivityIdReusePolicy_shorthandValue, enumspb.ActivityIdReusePolicy_value) + if err != nil { + return o, fmt.Errorf("invalid activity ID reuse policy: %w", err) + } + } + if opts.IdConflictPolicy.Value != "" { + var err error + o.ActivityIDConflictPolicy, err = stringToProtoEnum[enumspb.ActivityIdConflictPolicy]( + opts.IdConflictPolicy.Value, enumspb.ActivityIdConflictPolicy_shorthandValue, enumspb.ActivityIdConflictPolicy_value) + if err != nil { + return o, fmt.Errorf("invalid activity ID conflict policy: %w", err) + } + } + if len(opts.SearchAttribute) > 0 { + saMap, err := stringKeysJSONValues(opts.SearchAttribute, false) + if err != nil { + return o, fmt.Errorf("invalid search attribute values: %w", err) + } + if o.TypedSearchAttributes, err = mapToSearchAttributes(saMap); err != nil { + return o, err + } + } + return o, nil +} + +// mapToSearchAttributes builds typed search attributes from a map produced by +// json.Unmarshal into map[string]any. The possible types from json.Unmarshal are: +// +// bool → NewSearchAttributeKeyBool (correct for Bool attributes) +// float64 → NewSearchAttributeKeyFloat64 (correct for Double; also used for Int, +// since JSON has a single number type) +// string → NewSearchAttributeKeyKeyword (correct for Keyword; also used for Text +// and Datetime, since JSON has a single string type) +// []any → NewSearchAttributeKeyKeywordList (each element must be string) +// map[string]any, nil → rejected (no corresponding SA type) +// +// For inputs where the payload type metadata doesn't match the registered type +// (Int registered but sent as Float64, Text/Datetime registered but sent as +// Keyword), the server decodes using its schema, not payload metadata, so these +// work correctly. +func mapToSearchAttributes(m map[string]any) (temporal.SearchAttributes, error) { + updates := make([]temporal.SearchAttributeUpdate, 0, len(m)) + for k, v := range m { + switch val := v.(type) { + case string: + updates = append(updates, temporal.NewSearchAttributeKeyKeyword(k).ValueSet(val)) + case float64: + updates = append(updates, temporal.NewSearchAttributeKeyFloat64(k).ValueSet(val)) + case bool: + updates = append(updates, temporal.NewSearchAttributeKeyBool(k).ValueSet(val)) + case []any: + strs := make([]string, len(val)) + for i, elem := range val { + s, ok := elem.(string) + if !ok { + return temporal.SearchAttributes{}, fmt.Errorf("search attribute %q: array element %d is %T, not string", k, i, elem) + } + strs[i] = s + } + updates = append(updates, temporal.NewSearchAttributeKeyKeywordList(k).ValueSet(strs)) + default: + return temporal.SearchAttributes{}, fmt.Errorf("unsupported search attribute type for key %q: %T", k, v) + } + } + return temporal.NewSearchAttributes(updates...), nil +} + +func getActivityResult(cctx *CommandContext, cl client.Client, namespace, activityID, runID string) error { + outcome, err := pollActivityOutcome(cctx, cl, namespace, activityID, runID) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + return fmt.Errorf("activity not found: %s", activityID) + } + return fmt.Errorf("failed polling activity result: %w", err) + } + + switch v := outcome.GetValue().(type) { + case *activitypb.ActivityExecutionOutcome_Result: + return printActivityResult(cctx, activityID, runID, v.Result) + case *activitypb.ActivityExecutionOutcome_Failure: + if err := printActivityFailure(cctx, activityID, runID, v.Failure); err != nil { + cctx.Logger.Error("Activity failed, and printing the output also failed", "error", err) + } + return fmt.Errorf("activity failed") + default: + return fmt.Errorf("unexpected activity outcome type: %T", v) + } +} + +// Matches the SDK's pollActivityTimeout in internal_activity_client.go. +const pollActivityTimeout = 60 * time.Second + +// pollActivityOutcome polls for an activity result using a hand-rolled loop +// rather than handle.Get() because handle.Get() deserializes the result into a +// Go value and converts failures to Go errors, losing the raw proto payloads. +// +// The per-request timeout matches the SDK's PollActivityResult implementation. +// Unlike the SDK, we retry at the application level on per-request timeout +// since we don't have the SDK's gRPC-level retry interceptor. +func pollActivityOutcome(cctx *CommandContext, cl client.Client, namespace, activityID, runID string) (*activitypb.ActivityExecutionOutcome, error) { + for { + pollCtx, cancel := context.WithTimeout(cctx, pollActivityTimeout) + resp, err := cl.WorkflowService().PollActivityExecution(pollCtx, &workflowservice.PollActivityExecutionRequest{ + Namespace: namespace, + ActivityId: activityID, + RunId: runID, + }) + if err != nil { + // check pollCtx.Err() first beause it is set by cancel() + pollTimedOut := pollCtx.Err() != nil + cancel() + if cctx.Err() != nil { + return nil, cctx.Err() + } + // Per-request timeout but parent still alive: retry. + if pollTimedOut { + continue + } + return nil, err + } + cancel() + if resp.GetOutcome() != nil { + return resp.GetOutcome(), nil + } + } +} + +func printActivityResult(cctx *CommandContext, activityID, runID string, result *common.Payloads) error { + if cctx.JSONOutput { + var resultJSON json.RawMessage + var err error + if cctx.JSONShorthandPayloads { + var valuePtr any + if err = converter.GetDefaultDataConverter().FromPayloads(result, &valuePtr); err != nil { + return fmt.Errorf("failed decoding result: %w", err) + } + resultJSON, err = json.Marshal(valuePtr) + } else { + resultJSON, err = cctx.MarshalProtoJSON(result) + } + if err != nil { + return fmt.Errorf("failed marshaling result: %w", err) + } + return cctx.Printer.PrintStructured(struct { + ActivityId string `json:"activityId"` + RunId string `json:"runId"` + Status string `json:"status"` + Result json.RawMessage `json:"result"` + }{ + ActivityId: activityID, + RunId: runID, + Status: "COMPLETED", + Result: resultJSON, + }, printer.StructuredOptions{}) + } + + cctx.Printer.Println(color.MagentaString("Results:")) + var valuePtr any + if err := converter.GetDefaultDataConverter().FromPayloads(result, &valuePtr); err != nil { + return fmt.Errorf("failed decoding result: %w", err) + } + resultJSON, err := json.Marshal(valuePtr) + if err != nil { + return fmt.Errorf("failed marshaling result: %w", err) + } + return cctx.Printer.PrintStructured(struct { + Status string + Result json.RawMessage `cli:",cardOmitEmpty"` + }{ + Status: color.GreenString("COMPLETED"), + Result: resultJSON, + }, printer.StructuredOptions{}) +} + +func printActivityFailure(cctx *CommandContext, activityID, runID string, f *failure.Failure) error { + if cctx.JSONOutput { + failureJSON, err := cctx.MarshalProtoJSON(f) + if err != nil { + return fmt.Errorf("failed marshaling failure: %w", err) + } + _ = cctx.Printer.PrintStructured(struct { + ActivityId string `json:"activityId"` + RunId string `json:"runId"` + Status string `json:"status"` + Failure json.RawMessage `json:"failure"` + }{ + ActivityId: activityID, + RunId: runID, + Status: "FAILED", + Failure: failureJSON, + }, printer.StructuredOptions{}) + return nil + } + + cctx.Printer.Println(color.MagentaString("Results:")) + _ = cctx.Printer.PrintStructured(struct { + Status string + Failure string `cli:",cardOmitEmpty"` + }{ + Status: color.RedString("FAILED"), + Failure: cctx.MarshalFriendlyFailureBodyText(f, " "), + }, printer.StructuredOptions{}) + return nil +} + +func (c *TemporalActivityDescribeCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle := cl.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: c.ActivityId, + RunID: c.RunId, + }) + desc, err := handle.Describe(cctx, client.DescribeActivityOptions{}) + if err != nil { + return fmt.Errorf("failed describing activity: %w", err) + } + if c.Raw || cctx.JSONOutput { + return cctx.Printer.PrintStructured(desc.RawExecutionInfo, printer.StructuredOptions{}) + } + return printActivityDescription(cctx, desc.RawExecutionInfo) +} + +func printActivityDescription(cctx *CommandContext, info *activitypb.ActivityExecutionInfo) error { + statusShorthand := func(s enumspb.ActivityExecutionStatus) string { + for name, val := range enumspb.ActivityExecutionStatus_shorthandValue { + if int32(s) == val { + return name + } + } + return s.String() + } + runStateShorthand := func(s enumspb.PendingActivityState) string { + for name, val := range enumspb.PendingActivityState_shorthandValue { + if int32(s) == val && name != "Unspecified" { + return name + } + } + return "" + } + + d := struct { + ActivityId string + RunId string + Type string + Status string + RunState string `cli:",cardOmitEmpty"` + TaskQueue string + ScheduleToCloseTimeout time.Duration `cli:",cardOmitEmpty"` + ScheduleToStartTimeout time.Duration `cli:",cardOmitEmpty"` + StartToCloseTimeout time.Duration `cli:",cardOmitEmpty"` + HeartbeatTimeout time.Duration `cli:",cardOmitEmpty"` + LastStartedTime time.Time `cli:",cardOmitEmpty"` + Attempt int32 + ExecutionDuration time.Duration `cli:",cardOmitEmpty"` + ScheduleTime time.Time `cli:",cardOmitEmpty"` + CloseTime time.Time `cli:",cardOmitEmpty"` + LastFailure string `cli:",cardOmitEmpty"` + LastWorkerIdentity string `cli:",cardOmitEmpty"` + LastAttemptCompleteTime time.Time `cli:",cardOmitEmpty"` + StateTransitionCount int64 + }{ + ActivityId: info.GetActivityId(), + RunId: info.GetRunId(), + Type: info.GetActivityType().GetName(), + Status: statusShorthand(info.GetStatus()), + RunState: runStateShorthand(info.GetRunState()), + TaskQueue: info.GetTaskQueue(), + ScheduleToCloseTimeout: info.GetScheduleToCloseTimeout().AsDuration(), + ScheduleToStartTimeout: info.GetScheduleToStartTimeout().AsDuration(), + StartToCloseTimeout: info.GetStartToCloseTimeout().AsDuration(), + HeartbeatTimeout: info.GetHeartbeatTimeout().AsDuration(), + LastStartedTime: timestampToTime(info.GetLastStartedTime()), + Attempt: info.GetAttempt(), + ExecutionDuration: info.GetExecutionDuration().AsDuration(), + ScheduleTime: timestampToTime(info.GetScheduleTime()), + CloseTime: timestampToTime(info.GetCloseTime()), + LastWorkerIdentity: info.GetLastWorkerIdentity(), + LastAttemptCompleteTime: timestampToTime(info.GetLastAttemptCompleteTime()), + StateTransitionCount: info.GetStateTransitionCount(), + } + if f := info.GetLastFailure(); f != nil { + d.LastFailure = cctx.MarshalFriendlyFailureBodyText(f, " ") + } + return cctx.Printer.PrintStructured(d, printer.StructuredOptions{}) +} + +func (c *TemporalActivityListCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + if c.Limit > 0 && c.Limit < c.PageSize { + c.PageSize = c.Limit + } + + cctx.Printer.StartList() + defer cctx.Printer.EndList() + + var nextPageToken []byte + var execsProcessed int + for pageIndex := 0; ; pageIndex++ { + resp, err := cl.WorkflowService().ListActivityExecutions(cctx, &workflowservice.ListActivityExecutionsRequest{ + Namespace: c.Parent.Namespace, + PageSize: int32(c.PageSize), + NextPageToken: nextPageToken, + Query: c.Query, + }) + if err != nil { + return fmt.Errorf("failed listing activities: %w", err) + } + var textTable []map[string]any + for _, exec := range resp.Executions { + if c.Limit > 0 && execsProcessed >= c.Limit { + break + } + execsProcessed++ + if cctx.JSONOutput { + _ = cctx.Printer.PrintStructured(exec, printer.StructuredOptions{}) + } else { + textTable = append(textTable, map[string]any{ + "Status": exec.Status, + "ActivityId": exec.ActivityId, + "Type": exec.ActivityType.GetName(), + "StartTime": exec.ScheduleTime.AsTime(), + }) + } + } + if len(textTable) > 0 { + _ = cctx.Printer.PrintStructured(textTable, printer.StructuredOptions{ + Fields: []string{"Status", "ActivityId", "Type", "StartTime"}, + Table: &printer.TableOptions{NoHeader: pageIndex > 0}, + }) + } + nextPageToken = resp.NextPageToken + if len(nextPageToken) == 0 || (c.Limit > 0 && execsProcessed >= c.Limit) { + return nil + } + } +} + +func (c *TemporalActivityCountCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + resp, err := cl.WorkflowService().CountActivityExecutions(cctx, &workflowservice.CountActivityExecutionsRequest{ + Namespace: c.Parent.Namespace, + Query: c.Query, + }) + if err != nil { + return fmt.Errorf("failed counting activities: %w", err) + } + groups := make([]countGroup, len(resp.Groups)) + for i, g := range resp.Groups { + groups[i] = g + } + if cctx.JSONOutput { + stripCountGroupMetadataType(groups) + return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{}) + } + cctx.Printer.Printlnf("Total: %v", resp.Count) + printCountGroupsText(cctx, groups) + return nil +} + +func (c *TemporalActivityCancelCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + handle := cl.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: c.ActivityId, + RunID: c.RunId, + }) + if err := handle.Cancel(cctx, client.CancelActivityOptions{Reason: c.Reason}); err != nil { + return fmt.Errorf("failed to request activity cancellation: %w", err) + } + cctx.Printer.Println("Cancellation requested") + return nil +} + +func (c *TemporalActivityTerminateCommand) run(cctx *CommandContext, args []string) error { + cl, err := dialClient(cctx, &c.Parent.ClientOptions) + if err != nil { + return err + } + defer cl.Close() + + // The CLI adds a default for terminate but not cancel. + // This matches the behavior for workflows. + reason := c.Reason + if reason == "" { + reason = defaultReason() + } + handle := cl.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: c.ActivityId, + RunID: c.RunId, + }) + // Terminate may fail if the activity doesn't exist or has already completed. + if err := handle.Terminate(cctx, client.TerminateActivityOptions{Reason: reason}); err != nil { + return fmt.Errorf("failed to terminate activity: %w", err) + } + cctx.Printer.Println("Activity terminated") + return nil +} + func (c *TemporalActivityCompleteCommand) run(cctx *CommandContext, args []string) error { cl, err := dialClient(cctx, &c.Parent.ClientOptions) if err != nil { diff --git a/internal/temporalcli/commands.activity_test.go b/internal/temporalcli/commands.activity_test.go index 5b12f7495..eda985759 100644 --- a/internal/temporalcli/commands.activity_test.go +++ b/internal/temporalcli/commands.activity_test.go @@ -2,16 +2,20 @@ package temporalcli_test import ( "context" + "encoding/json" "fmt" + "strings" "sync" "sync/atomic" "time" + "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "google.golang.org/grpc" @@ -520,3 +524,814 @@ func (s *SharedServerSuite) TestResetActivity_BatchSuccess() { // unblock the activities to let them finish failActivity.Store(false) } + +func (s *SharedServerSuite) TestActivity_Start() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return "start-result", nil + }) + + res := s.Execute( + "activity", "start", + "--activity-id", "start-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.Contains(out, "Running execution:") + s.ContainsOnSameLine(out, "ActivityId", "start-test") + s.Contains(out, "RunId") + s.ContainsOnSameLine(out, "Namespace", "default") + + // JSON + res = s.Execute( + "activity", "start", + "-o", "json", + "--activity-id", "start-test-json", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + ) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("start-test-json", jsonOut["activityId"]) + s.NotEmpty(jsonOut["runId"]) + s.Equal("default", jsonOut["namespace"]) +} + +func (s *SharedServerSuite) TestActivity_Start_With_Headers() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return nil, nil + }) + + var capturedHeader *workflowservice.StartActivityExecutionRequest + var mu sync.Mutex + s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append( + s.CommandHarness.Options.AdditionalClientGRPCDialOptions, + grpc.WithChainUnaryInterceptor(func( + ctx context.Context, + method string, req, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, + ) error { + if startReq, ok := req.(*workflowservice.StartActivityExecutionRequest); ok { + mu.Lock() + capturedHeader = startReq + mu.Unlock() + } + return invoker(ctx, method, req, reply, cc, opts...) + }), + ) + + res := s.Execute( + "activity", "start", + "--activity-id", "header-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--headers", "id=123", + "--address", s.Address(), + ) + s.NoError(res.Err) + + mu.Lock() + defer mu.Unlock() + s.NotNil(capturedHeader) + payload := capturedHeader.Header.Fields["id"] + s.NotNil(payload) + var val int + s.NoError(converter.GetDefaultDataConverter().FromPayload(payload, &val)) + s.Equal(123, val) +} + +func (s *SharedServerSuite) TestActivity_Execute_Success() { + var receivedInput any + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + receivedInput = a + return map[string]string{"foo": "bar"}, nil + }) + + // Text + res := s.Execute( + "activity", "execute", + "--activity-id", "exec-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "-i", `"my-input"`, + "--address", s.Address(), + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.Contains(out, "Running execution:") + s.ContainsOnSameLine(out, "ActivityId", "exec-test") + s.Contains(out, "Results:") + s.ContainsOnSameLine(out, "Status", "COMPLETED") + s.ContainsOnSameLine(out, "Result", `{"foo":"bar"}`) + s.Equal("my-input", receivedInput) + + // JSON + res = s.Execute( + "activity", "execute", + "-o", "json", + "--activity-id", "exec-json-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + ) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("exec-json-test", jsonOut["activityId"]) + s.NotEmpty(jsonOut["runId"]) + s.Equal("COMPLETED", jsonOut["status"]) + s.Equal(map[string]any{"foo": "bar"}, jsonOut["result"]) +} + +func (s *SharedServerSuite) TestActivity_Execute_Failure() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return nil, fmt.Errorf("intentional failure") + }) + + // Text + res := s.Execute( + "activity", "execute", + "--activity-id", "exec-fail-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--retry-maximum-attempts", "1", + "--address", s.Address(), + ) + s.ErrorContains(res.Err, "activity failed") + out := res.Stdout.String() + s.Contains(out, "Running execution:") + s.Contains(out, "Results:") + s.Contains(out, "FAILED") + s.Contains(out, "intentional failure") + + // JSON + res = s.Execute( + "activity", "execute", + "-o", "json", + "--activity-id", "exec-fail-json-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--retry-maximum-attempts", "1", + "--address", s.Address(), + ) + s.Error(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("exec-fail-json-test", jsonOut["activityId"]) + s.NotEmpty(jsonOut["runId"]) + s.Equal("FAILED", jsonOut["status"]) + failureObj, ok := jsonOut["failure"].(map[string]any) + s.True(ok, "failure should be a structured object, got: %T", jsonOut["failure"]) + s.Contains(failureObj["message"], "intentional failure") + s.NotNil(failureObj["applicationFailureInfo"]) +} + +func (s *SharedServerSuite) TestActivity_Execute_NoJsonShorthandPayloads() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return map[string]string{"key": "val"}, nil + }) + + // With shorthand (default): result is decoded + res := s.Execute( + "activity", "execute", + "-o", "json", + "--activity-id", "shorthand-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + ) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal(map[string]any{"key": "val"}, jsonOut["result"]) + + // Without shorthand: result should be raw payloads with metadata/data + res = s.Execute( + "activity", "execute", + "-o", "json", + "--no-json-shorthand-payloads", + "--activity-id", "no-shorthand-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + ) + s.NoError(res.Err) + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + resultMap, ok := jsonOut["result"].(map[string]any) + s.True(ok, "result should be a payloads object, got: %T", jsonOut["result"]) + payloads, ok := resultMap["payloads"].([]any) + s.True(ok, "result should have payloads array") + s.Len(payloads, 1) + payload := payloads[0].(map[string]any) + s.NotNil(payload["metadata"]) + s.NotNil(payload["data"]) +} + +func (s *SharedServerSuite) TestActivity_Execute_RetriesOnEmptyPollResponse() { + // Activity sleeps longer than the server's activity.longPollTimeout (2s), + // forcing at least one empty poll response before the result arrives. + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + time.Sleep(3 * time.Second) + return "standalone-result", nil + }) + + res := s.Execute( + "activity", "execute", + "--activity-id", "poll-retry-test", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "standalone-result") +} + +// startActivity starts an activity via the CLI and returns +// the parsed JSON response containing activityId and runId. +func (s *SharedServerSuite) startActivity(activityID string, extraArgs ...string) map[string]any { + args := []string{ + "activity", "start", + "-o", "json", + "--activity-id", activityID, + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--address", s.Address(), + } + args = append(args, extraArgs...) + res := s.Execute(args...) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + return jsonOut +} + +func (s *SharedServerSuite) TestActivity_Result() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return "result-value", nil + }) + + started := s.startActivity("result-test") + + res := s.Execute( + "activity", "result", + "--activity-id", "result-test", + "--run-id", started["runId"].(string), + "--address", s.Address(), + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "result-value") + + // JSON output without --run-id + res = s.Execute( + "activity", "result", + "-o", "json", + "--activity-id", "result-test", + "--address", s.Address(), + ) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("COMPLETED", jsonOut["status"]) + s.Equal("result-test", jsonOut["activityId"]) + s.Equal("result-value", jsonOut["result"]) +} + +func (s *SharedServerSuite) TestActivity_Result_NotFound() { + res := s.Execute( + "activity", "result", + "--activity-id", "nonexistent-activity-id", + "--address", s.Address(), + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "not found") + s.NotContains(res.Stdout.String(), "FAILED") +} + +func (s *SharedServerSuite) TestActivity_Describe() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + started := s.startActivity("describe-test", + "--schedule-to-close-timeout", "300s", + "--schedule-to-start-timeout", "60s", + "--heartbeat-timeout", "15s", + "--retry-maximum-attempts", "5", + "--retry-initial-interval", "2s", + "--retry-backoff-coefficient", "3", + "--retry-maximum-interval", "120s", + ) + runID := started["runId"].(string) + <-activityStarted + + // Text + res := s.Execute( + "activity", "describe", + "--activity-id", "describe-test", + "--run-id", runID, + "--address", s.Address(), + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "ActivityId", "describe-test") + s.ContainsOnSameLine(out, "Type", "DevActivity") + s.ContainsOnSameLine(out, "Status", "Running") + s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) + s.ContainsOnSameLine(out, "StartToCloseTimeout", "30s") + s.ContainsOnSameLine(out, "ScheduleToCloseTimeout", "5m0s") + s.ContainsOnSameLine(out, "ScheduleToStartTimeout", "1m0s") + s.ContainsOnSameLine(out, "HeartbeatTimeout", "15s") + s.ContainsOnSameLine(out, "Attempt", "1") + s.Contains(out, "LastWorkerIdentity") + s.NotContains(out, `{"name":`) + + // JSON + res = s.Execute( + "activity", "describe", + "-o", "json", + "--activity-id", "describe-test", + "--run-id", runID, + "--address", s.Address(), + ) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("describe-test", jsonOut["activityId"]) + s.NotNil(jsonOut["activityType"]) + s.NotNil(jsonOut["taskQueue"]) + s.Equal("300s", jsonOut["scheduleToCloseTimeout"]) + s.Equal("60s", jsonOut["scheduleToStartTimeout"]) + s.Equal("30s", jsonOut["startToCloseTimeout"]) + s.Equal("15s", jsonOut["heartbeatTimeout"]) + retryPolicy, ok := jsonOut["retryPolicy"].(map[string]any) + s.True(ok, "retryPolicy should be present in JSON describe") + s.Equal(float64(5), retryPolicy["maximumAttempts"]) + s.Equal("2s", retryPolicy["initialInterval"]) + s.Equal(float64(3), retryPolicy["backoffCoefficient"]) + s.Equal("120s", retryPolicy["maximumInterval"]) + + // Raw: should contain proto JSON format + res = s.Execute( + "activity", "describe", + "--raw", + "--activity-id", "describe-test", + "--run-id", runID, + "--address", s.Address(), + ) + s.NoError(res.Err) + rawOut := res.Stdout.String() + s.Contains(rawOut, "describe-test") + s.Contains(rawOut, `{"name":"DevActivity"}`) +} + +// Text-only: verifies LastFailure is rendered as text not JSON. +func (s *SharedServerSuite) TestActivity_Describe_FailedLastFailure() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return nil, fmt.Errorf("describe-failure-msg") + }) + + started := s.startActivity("describe-fail-test", "--retry-maximum-attempts", "1") + + // Wait for the activity to fail + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: "describe-fail-test", + RunID: started["runId"].(string), + }) + _ = handle.Get(s.Context, nil) + + res := s.Execute( + "activity", "describe", + "--activity-id", "describe-fail-test", + "--run-id", started["runId"].(string), + "--address", s.Address(), + ) + s.NoError(res.Err) + out := res.Stdout.String() + // LastFailure should be human-readable, not raw JSON + s.Contains(out, "describe-failure-msg") + s.NotContains(out, `"message":"describe-failure-msg"`) +} + +func (s *SharedServerSuite) TestActivity_List() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return "listed", nil + }) + + s.startActivity("list-test-1") + s.startActivity("list-test-2") + s.startActivity("list-test-3") + + // Wait for all three to be visible + s.Eventually(func() bool { + res := s.Execute( + "activity", "list", + "--address", s.Address(), + ) + out := res.Stdout.String() + return res.Err == nil && + strings.Contains(out, "list-test-1") && + strings.Contains(out, "list-test-2") && + strings.Contains(out, "list-test-3") + }, 5*time.Second, 200*time.Millisecond) + + // --limit should cap the number of results + res := s.Execute( + "activity", "list", + "--limit", "2", + "--address", s.Address(), + ) + s.NoError(res.Err) + lines := strings.Split(strings.TrimSpace(res.Stdout.String()), "\n") + s.Equal(3, len(lines), "expected header + 2 rows with --limit 2, got: %s", res.Stdout.String()) + + // JSON + res = s.Execute( + "activity", "list", + "-o", "json", + "--address", s.Address(), + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "activityId", "list-test-1") + s.ContainsOnSameLine(out, "status", "ACTIVITY_EXECUTION_STATUS_COMPLETED") + + // JSONL + res = s.Execute( + "activity", "list", + "-o", "jsonl", + "--address", s.Address(), + ) + s.NoError(res.Err) + jsonlLines := strings.Split(strings.TrimSpace(res.Stdout.String()), "\n") + s.GreaterOrEqual(len(jsonlLines), 3) + seen := map[string]bool{} + for _, line := range jsonlLines { + var exec struct { + ActivityId string `json:"activityId"` + } + s.NoError(json.Unmarshal([]byte(line), &exec)) + seen[exec.ActivityId] = true + } + s.True(seen["list-test-1"]) + s.True(seen["list-test-2"]) + s.True(seen["list-test-3"]) +} + +func (s *SharedServerSuite) TestActivity_Count() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return "counted", nil + }) + + s.startActivity("count-test") + + // Text + s.Eventually(func() bool { + res := s.Execute( + "activity", "count", + "--address", s.Address(), + ) + return res.Err == nil && strings.Contains(res.Stdout.String(), "Total:") + }, 5*time.Second, 200*time.Millisecond) + + // Grouped text + s.Eventually(func() bool { + res := s.Execute( + "activity", "count", + "--address", s.Address(), + "--query", "GROUP BY ExecutionStatus", + ) + if res.Err != nil { + return false + } + out := res.Stdout.String() + return strings.Contains(out, "Total:") && strings.Contains(out, "Group total:") + }, 5*time.Second, 200*time.Millisecond) + + // JSON + res := s.Execute( + "activity", "count", + "--address", s.Address(), + "-o", "json", + ) + s.NoError(res.Err) + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + _, ok := jsonOut["count"] + s.True(ok) +} + +// No JSON variant: command produces no output on success in any mode. +func (s *SharedServerSuite) TestActivity_Complete_ByRunId() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + started := s.startActivity("sa-complete-test") + runID := started["runId"].(string) + <-activityStarted + + res := s.Execute( + "activity", "complete", + "--activity-id", "sa-complete-test", + "--run-id", runID, + "--result", `"completed-externally"`, + "--identity", identity, + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: "sa-complete-test", + RunID: runID, + }) + var actual string + s.NoError(handle.Get(s.Context, &actual)) + s.Equal("completed-externally", actual) +} + +// No JSON variant: command produces no output on success in any mode. +func (s *SharedServerSuite) TestActivity_Fail_ByRunId() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + started := s.startActivity("sa-fail-test") + runID := started["runId"].(string) + <-activityStarted + + res := s.Execute( + "activity", "fail", + "--activity-id", "sa-fail-test", + "--run-id", runID, + "--reason", "external-failure", + "--identity", identity, + "--address", s.Address(), + ) + s.NoError(res.Err) + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: "sa-fail-test", + RunID: runID, + }) + err := handle.Get(s.Context, nil) + s.Error(err) + s.Contains(err.Error(), "external-failure") +} + +// No JSON variant: Println outputs the same text regardless of -o json (matches workflow cancel). +func (s *SharedServerSuite) TestActivity_Cancel() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + started := s.startActivity("cancel-test") + runID := started["runId"].(string) + <-activityStarted + + res := s.Execute( + "activity", "cancel", + "--activity-id", "cancel-test", + "--run-id", runID, + "--reason", "test-cancel", + "--address", s.Address(), + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Cancellation requested") + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: "cancel-test", + RunID: runID, + }) + s.Eventually(func() bool { + desc, err := handle.Describe(s.Context, client.DescribeActivityOptions{}) + return err == nil && desc.RunState.String() == "CancelRequested" + }, 5*time.Second, 100*time.Millisecond) +} + +// No JSON variant: Println outputs the same text regardless of -o json (matches workflow terminate). +func (s *SharedServerSuite) TestActivity_Terminate() { + activityStarted := make(chan struct{}) + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + close(activityStarted) + <-ctx.Done() + return nil, ctx.Err() + }) + + started := s.startActivity("terminate-test") + runID := started["runId"].(string) + <-activityStarted + + res := s.Execute( + "activity", "terminate", + "--activity-id", "terminate-test", + "--run-id", runID, + "--reason", "test-terminate", + "--address", s.Address(), + ) + s.NoError(res.Err) + s.Contains(res.Stdout.String(), "Activity terminated") + + handle := s.Client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: "terminate-test", + RunID: runID, + }) + err := handle.Get(s.Context, nil) + s.Error(err) + s.Contains(err.Error(), "terminated") +} + +func (s *SharedServerSuite) TestActivity_SearchAttributes() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return nil, nil + }) + + for _, sa := range []struct{ name, typ string }{ + {"SATestBool", "Bool"}, + {"SATestInt", "Int"}, + {"SATestDouble", "Double"}, + {"SATestKeyword", "Keyword"}, + {"SATestText", "Text"}, + {"SATestKeywordList", "KeywordList"}, + } { + res := s.Execute( + "operator", "search-attribute", "create", + "--address", s.Address(), + "--name", sa.name, + "--type", sa.typ, + ) + s.NoError(res.Err) + } + + unique := uuid.NewString()[:8] + + // Bool (JSON bool → NewSearchAttributeKeyBool) + s.startActivity("sa-bool-"+unique, + "--search-attribute", `SATestBool=true`, + ) + s.Eventually(func() bool { + res := s.Execute("activity", "list", "--address", s.Address(), + "--query", `SATestBool = true`) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-bool-"+unique) + }, 5*time.Second, 200*time.Millisecond) + + // Int (JSON number → float64 → sent as Float64; server decodes via schema) + s.startActivity("sa-int-"+unique, + "--search-attribute", `SATestInt=42`, + ) + s.Eventually(func() bool { + res := s.Execute("activity", "list", "--address", s.Address(), + "--query", `SATestInt = 42`) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-int-"+unique) + }, 5*time.Second, 200*time.Millisecond) + + // Double (JSON number → float64 → NewSearchAttributeKeyFloat64) + s.startActivity("sa-double-"+unique, + "--search-attribute", `SATestDouble=3.14`, + ) + s.Eventually(func() bool { + res := s.Execute("activity", "list", "--address", s.Address(), + "--query", `SATestDouble = 3.14`) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-double-"+unique) + }, 5*time.Second, 200*time.Millisecond) + + // Keyword (JSON string → NewSearchAttributeKeyKeyword) + s.startActivity("sa-keyword-"+unique, + "--search-attribute", fmt.Sprintf(`SATestKeyword="kw-%s"`, unique), + ) + s.Eventually(func() bool { + res := s.Execute("activity", "list", "--address", s.Address(), + "--query", fmt.Sprintf(`SATestKeyword = "kw-%s"`, unique)) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-keyword-"+unique) + }, 5*time.Second, 200*time.Millisecond) + + // Text (JSON string → sent as Keyword; server decodes via schema) + s.startActivity("sa-text-"+unique, + "--search-attribute", fmt.Sprintf(`SATestText="text value %s"`, unique), + ) + s.Eventually(func() bool { + res := s.Execute("activity", "list", "--address", s.Address(), + "--query", fmt.Sprintf(`SATestText = "text value %s"`, unique)) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-text-"+unique) + }, 5*time.Second, 200*time.Millisecond) + + // KeywordList (JSON array → []any → NewSearchAttributeKeyKeywordList) + s.startActivity("sa-kwlist-"+unique, + "--search-attribute", `SATestKeywordList=["alpha","beta"]`, + ) + s.Eventually(func() bool { + res := s.Execute("activity", "list", "--address", s.Address(), + "--query", `SATestKeywordList = "alpha"`) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-kwlist-"+unique) + }, 5*time.Second, 200*time.Millisecond) +} + +func (s *SharedServerSuite) TestActivity_SearchAttributes_Datetime() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return nil, nil + }) + + res := s.Execute( + "operator", "search-attribute", "create", + "--address", s.Address(), + "--name", "SATestDatetime", + "--type", "Datetime", + ) + s.NoError(res.Err) + + s.startActivity("sa-datetime-test", + "--search-attribute", `SATestDatetime="2024-01-15T00:00:00Z"`, + ) + s.Eventually(func() bool { + res = s.Execute( + "activity", "list", + "--address", s.Address(), + "--query", `SATestDatetime > "2024-01-14T00:00:00Z"`, + ) + return res.Err == nil && strings.Contains(res.Stdout.String(), "sa-datetime-test") + }, 5*time.Second, 200*time.Millisecond) +} + +func (s *SharedServerSuite) TestActivity_SearchAttributes_InvalidKeywordList() { + res := s.Execute( + "activity", "start", + "--activity-id", "sa-invalid-kwlist", + "--type", "DevActivity", + "--task-queue", s.Worker().Options.TaskQueue, + "--start-to-close-timeout", "30s", + "--search-attribute", `Foo=[1,"a"]`, + "--address", s.Address(), + ) + s.Error(res.Err) + s.Contains(res.Err.Error(), "array element 0 is float64, not string") +} + +func (s *SharedServerSuite) TestActivity_List_Pagination() { + s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) { + return "paginated", nil + }) + + uniqueKW := "page-" + uuid.NewString()[:8] + for i := 0; i < 5; i++ { + s.startActivity(fmt.Sprintf("page-test-%d", i), + "--search-attribute", fmt.Sprintf(`CustomKeywordField="%s"`, uniqueKW), + ) + } + + // Wait for all 5 to be visible + s.Eventually(func() bool { + res := s.Execute( + "activity", "list", + "--address", s.Address(), + "--query", fmt.Sprintf(`CustomKeywordField = "%s"`, uniqueKW), + ) + return res.Err == nil && strings.Count(res.Stdout.String(), "page-test-") >= 5 + }, 5*time.Second, 200*time.Millisecond) + + // Small page size forces multi-page fetching; verify all 5 appear + res := s.Execute( + "activity", "list", + "--page-size", "2", + "--address", s.Address(), + "--query", fmt.Sprintf(`CustomKeywordField = "%s"`, uniqueKW), + ) + s.NoError(res.Err) + s.Equal(5, strings.Count(res.Stdout.String(), "page-test-")) + + // --limit 3 with page-size 2 should return exactly 3 + res = s.Execute( + "activity", "list", + "--page-size", "2", + "--limit", "3", + "--address", s.Address(), + "--query", fmt.Sprintf(`CustomKeywordField = "%s"`, uniqueKW), + ) + s.NoError(res.Err) + s.Equal(3, strings.Count(res.Stdout.String(), "page-test-")) +} diff --git a/internal/temporalcli/commands.gen.go b/internal/temporalcli/commands.gen.go index a06561320..87cdc5c8f 100644 --- a/internal/temporalcli/commands.gen.go +++ b/internal/temporalcli/commands.gen.go @@ -370,6 +370,78 @@ func (v *WorkflowUpdateOptionsOptions) BuildFlags(f *pflag.FlagSet) { f.StringVar(&v.VersioningOverrideBuildId, "versioning-override-build-id", "", "When overriding to a `pinned` behavior, specifies the Build ID of the version to target.") } +type ActivityReferenceOptions struct { + ActivityId string + RunId string + FlagSet *pflag.FlagSet +} + +func (v *ActivityReferenceOptions) BuildFlags(f *pflag.FlagSet) { + v.FlagSet = f + f.StringVarP(&v.ActivityId, "activity-id", "a", "", "Activity ID. Required.") + _ = cobra.MarkFlagRequired(f, "activity-id") + f.StringVarP(&v.RunId, "run-id", "r", "", "Activity Run ID. If not set, targets the latest run.") +} + +type ActivityStartOptions struct { + ActivityId string + Type string + TaskQueue string + ScheduleToCloseTimeout cliext.FlagDuration + ScheduleToStartTimeout cliext.FlagDuration + StartToCloseTimeout cliext.FlagDuration + HeartbeatTimeout cliext.FlagDuration + RetryInitialInterval cliext.FlagDuration + RetryMaximumInterval cliext.FlagDuration + RetryBackoffCoefficient float32 + RetryMaximumAttempts int + IdReusePolicy cliext.FlagStringEnum + IdConflictPolicy cliext.FlagStringEnum + SearchAttribute []string + Headers []string + StaticSummary string + StaticDetails string + PriorityKey int + FairnessKey string + FairnessWeight float32 + FlagSet *pflag.FlagSet +} + +func (v *ActivityStartOptions) BuildFlags(f *pflag.FlagSet) { + v.FlagSet = f + f.StringVarP(&v.ActivityId, "activity-id", "a", "", "Activity ID. Required.") + _ = cobra.MarkFlagRequired(f, "activity-id") + f.StringVar(&v.Type, "type", "", "Activity Type name. Required.") + _ = cobra.MarkFlagRequired(f, "type") + f.StringVarP(&v.TaskQueue, "task-queue", "t", "", "Activity task queue. Required.") + _ = cobra.MarkFlagRequired(f, "task-queue") + v.ScheduleToCloseTimeout = 0 + f.Var(&v.ScheduleToCloseTimeout, "schedule-to-close-timeout", "Maximum time for the Activity Execution, including all retries. Either this or \"start-to-close-timeout\" is required.") + v.ScheduleToStartTimeout = 0 + f.Var(&v.ScheduleToStartTimeout, "schedule-to-start-timeout", "Maximum time an Activity task can stay in a task queue before a Worker picks it up. On expiry it results in a non-retryable failure and no further attempts are scheduled.") + v.StartToCloseTimeout = 0 + f.Var(&v.StartToCloseTimeout, "start-to-close-timeout", "Maximum time for a single Activity attempt. On expiry a new attempt may be scheduled if permitted by the retry policy and schedule-to-close timeout. Either this or \"schedule-to-close-timeout\" is required.") + v.HeartbeatTimeout = 0 + f.Var(&v.HeartbeatTimeout, "heartbeat-timeout", "Maximum time between successful Worker heartbeats. On expiry the current activity attempt fails.") + v.RetryInitialInterval = 0 + f.Var(&v.RetryInitialInterval, "retry-initial-interval", "Interval of the first retry. If \"retry-backoff-coefficient\" is 1.0, it is used for all retries.") + v.RetryMaximumInterval = 0 + f.Var(&v.RetryMaximumInterval, "retry-maximum-interval", "Maximum interval between retries.") + f.Float32Var(&v.RetryBackoffCoefficient, "retry-backoff-coefficient", 0, "Coefficient for calculating the next retry interval. Must be 1 or larger.") + f.IntVar(&v.RetryMaximumAttempts, "retry-maximum-attempts", 0, "Maximum number of attempts. Setting to 1 disables retries. Setting to 0 means unlimited attempts.") + v.IdReusePolicy = cliext.NewFlagStringEnum([]string{"AllowDuplicate", "AllowDuplicateFailedOnly", "RejectDuplicate"}, "") + f.Var(&v.IdReusePolicy, "id-reuse-policy", "Policy for handling activity start when an Activity with the same ID exists and has completed. Accepted values: AllowDuplicate, AllowDuplicateFailedOnly, RejectDuplicate.") + v.IdConflictPolicy = cliext.NewFlagStringEnum([]string{"Fail", "UseExisting"}, "") + f.Var(&v.IdConflictPolicy, "id-conflict-policy", "Policy for handling activity start when an Activity with the same ID is currently running. Accepted values: Fail, UseExisting.") + f.StringArrayVar(&v.SearchAttribute, "search-attribute", nil, "Search Attribute in `KEY=VALUE` format. Keys must be identifiers, and values must be JSON values. Can be passed multiple times. See https://docs.temporal.io/visibility.") + f.StringArrayVar(&v.Headers, "headers", nil, "Temporal activity headers in 'KEY=VALUE' format. Keys must be identifiers, and values must be JSON values. May be passed multiple times.") + f.StringVar(&v.StaticSummary, "static-summary", "", "Static Activity summary for human consumption in UIs. Uses standard Markdown formatting excluding images, HTML, and script tags. EXPERIMENTAL.") + f.StringVar(&v.StaticDetails, "static-details", "", "Static Activity details for human consumption in UIs. Uses standard Markdown formatting excluding images, HTML, and script tags. EXPERIMENTAL.") + f.IntVar(&v.PriorityKey, "priority-key", 0, "Priority key (1-5, lower = higher priority). Default is 3 when not specified.") + f.StringVar(&v.FairnessKey, "fairness-key", "", "Fairness key (max 64 bytes) for proportional task dispatch.") + f.Float32Var(&v.FairnessWeight, "fairness-weight", 0, "Weight [0.001-1000] for this fairness key.") +} + type TemporalCommand struct { Command cobra.Command cliext.CommonOptions @@ -410,28 +482,62 @@ func NewTemporalActivityCommand(cctx *CommandContext, parent *TemporalCommand) * var s TemporalActivityCommand s.Parent = parent s.Command.Use = "activity" - s.Command.Short = "Complete, update, pause, unpause, reset or fail an Activity" - if hasHighlighting { - s.Command.Long = "Update an Activity's options, manage activity lifecycle or update\nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n\x1b[1mtemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\x1b[0m" - } else { - s.Command.Long = "Update an Activity's options, manage activity lifecycle or update\nan Activity's state to completed or failed.\n\nUpdating activity state marks an Activity as successfully finished or as\nhaving encountered an error.\n\n```\ntemporal activity complete \\\n --activity-id=YourActivityId \\\n --workflow-id=YourWorkflowId \\\n --result='{\"YourResultKey\": \"YourResultValue\"}'\n```" - } + s.Command.Short = "Operate on Activity Executions" + s.Command.Long = "Perform operations on Activity Executions." s.Command.Args = cobra.NoArgs + s.Command.AddCommand(&NewTemporalActivityCancelCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityCompleteCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityCountCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityDescribeCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityExecuteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityFailCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityListCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityPauseCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityResetCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityResultCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityStartCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalActivityTerminateCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityUnpauseCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalActivityUpdateOptionsCommand(cctx, &s).Command) s.ClientOptions.BuildFlags(s.Command.PersistentFlags()) return &s } -type TemporalActivityCompleteCommand struct { +type TemporalActivityCancelCommand struct { Parent *TemporalActivityCommand Command cobra.Command - WorkflowReferenceOptions + ActivityReferenceOptions + Reason string +} + +func NewTemporalActivityCancelCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityCancelCommand { + var s TemporalActivityCancelCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "cancel [flags]" + s.Command.Short = "Request cancellation of a Standalone Activity (Experimental)" + if hasHighlighting { + s.Command.Long = "Request cancellation of a Standalone Activity.\n\n\x1b[1mtemporal activity cancel \\\n --activity-id YourActivityId\x1b[0m\n\nRequesting cancellation transitions the Activity's run state\nto CancelRequested. If the Activity is heartbeating, a\ncancellation error will be raised when the next heartbeat\nresponse is received; if the Activity allows this error to\npropagate, the Activity transitions to canceled status." + } else { + s.Command.Long = "Request cancellation of a Standalone Activity.\n\n```\ntemporal activity cancel \\\n --activity-id YourActivityId\n```\n\nRequesting cancellation transitions the Activity's run state\nto CancelRequested. If the Activity is heartbeating, a\ncancellation error will be raised when the next heartbeat\nresponse is received; if the Activity allows this error to\npropagate, the Activity transitions to canceled status." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for cancellation.") + s.ActivityReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityCompleteCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command ActivityId string + WorkflowId string + RunId string Result string } @@ -440,18 +546,19 @@ func NewTemporalActivityCompleteCommand(cctx *CommandContext, parent *TemporalAc s.Parent = parent s.Command.DisableFlagsInUseLine = true s.Command.Use = "complete [flags]" - s.Command.Short = "Complete an Activity" + s.Command.Short = "Mark an activity as completed successfully with a result" if hasHighlighting { s.Command.Long = "Complete an Activity, marking it as successfully finished. Specify the\nActivity ID and include a JSON result for the returned value:\n\n\x1b[1mtemporal activity complete \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --result '{\"YourResultKey\": \"YourResultVal\"}'\x1b[0m" } else { s.Command.Long = "Complete an Activity, marking it as successfully finished. Specify the\nActivity ID and include a JSON result for the returned value:\n\n```\ntemporal activity complete \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --result '{\"YourResultKey\": \"YourResultVal\"}'\n```" } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to complete. Required.") + s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID. This may be the ID of an Activity invoked by a Workflow, or of a Standalone Activity. Required.") _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") + s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for workflow Activities. Omit for Standalone Activities.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. For workflow Activities (when --workflow-id is provided), this is the Workflow Run ID. For Standalone Activities, this is the Activity Run ID.") s.Command.Flags().StringVar(&s.Result, "result", "", "Result `JSON` to return. Required.") _ = cobra.MarkFlagRequired(s.Command.Flags(), "result") - s.WorkflowReferenceOptions.BuildFlags(s.Command.Flags()) s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) @@ -460,11 +567,97 @@ func NewTemporalActivityCompleteCommand(cctx *CommandContext, parent *TemporalAc return &s } -type TemporalActivityFailCommand struct { +type TemporalActivityCountCommand struct { Parent *TemporalActivityCommand Command cobra.Command - WorkflowReferenceOptions + Query string +} + +func NewTemporalActivityCountCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityCountCommand { + var s TemporalActivityCountCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "count [flags]" + s.Command.Short = "Count Standalone Activities matching a query (Experimental)" + if hasHighlighting { + s.Command.Long = "Return a count of Standalone Activities. Use \x1b[1m--query\x1b[0m to filter\nthe activities to be counted.\n\n\x1b[1mtemporal activity count \\\n --query 'ActivityType=\"YourActivity\"'\x1b[0m\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } else { + s.Command.Long = "Return a count of Standalone Activities. Use `--query` to filter\nthe activities to be counted.\n\n```\ntemporal activity count \\\n --query 'ActivityType=\"YourActivity\"'\n```\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Query to filter Activity Executions to count.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityDescribeCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + ActivityReferenceOptions + Raw bool +} + +func NewTemporalActivityDescribeCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityDescribeCommand { + var s TemporalActivityDescribeCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "describe [flags]" + s.Command.Short = "Show detailed info for a Standalone Activity (Experimental)" + if hasHighlighting { + s.Command.Long = "Display information about a Standalone Activity.\n\n\x1b[1mtemporal activity describe \\\n --activity-id YourActivityId\x1b[0m" + } else { + s.Command.Long = "Display information about a Standalone Activity.\n\n```\ntemporal activity describe \\\n --activity-id YourActivityId\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().BoolVar(&s.Raw, "raw", false, "Print properties without changing their format.") + s.ActivityReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityExecuteCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + ActivityStartOptions + PayloadInputOptions +} + +func NewTemporalActivityExecuteCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityExecuteCommand { + var s TemporalActivityExecuteCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "execute [flags]" + s.Command.Short = "Start a new Standalone Activity and wait for its result (Experimental)" + if hasHighlighting { + s.Command.Long = "Start a new Standalone Activity and block until it completes.\nThe result is output to stdout.\n\n\x1b[1mtemporal activity execute \\\n --activity-id YourActivityId \\\n --type YourActivity \\\n --task-queue YourTaskQueue \\\n --start-to-close-timeout 30s \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Start a new Standalone Activity and block until it completes.\nThe result is output to stdout.\n\n```\ntemporal activity execute \\\n --activity-id YourActivityId \\\n --type YourActivity \\\n --task-queue YourTaskQueue \\\n --start-to-close-timeout 30s \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.ActivityStartOptions.BuildFlags(s.Command.Flags()) + s.PayloadInputOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityFailCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command ActivityId string + WorkflowId string + RunId string Detail string Reason string } @@ -474,18 +667,50 @@ func NewTemporalActivityFailCommand(cctx *CommandContext, parent *TemporalActivi s.Parent = parent s.Command.DisableFlagsInUseLine = true s.Command.Use = "fail [flags]" - s.Command.Short = "Fail an Activity" + s.Command.Short = "Mark an Activity as completed unsuccessfully with an error" if hasHighlighting { - s.Command.Long = "Fail an Activity, marking it as having encountered an error. Specify the\nActivity and Workflow IDs:\n\n\x1b[1mtemporal activity fail \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\x1b[0m" + s.Command.Long = "Fail an Activity, marking it as having encountered an error:\n\n\x1b[1mtemporal activity fail \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\x1b[0m" } else { - s.Command.Long = "Fail an Activity, marking it as having encountered an error. Specify the\nActivity and Workflow IDs:\n\n```\ntemporal activity fail \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n```" + s.Command.Long = "Fail an Activity, marking it as having encountered an error:\n\n```\ntemporal activity fail \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n```" } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to fail. Required.") + s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID. This may be the ID of an Activity invoked by a Workflow, or of a Standalone Activity. Required.") _ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id") - s.Command.Flags().StringVar(&s.Detail, "detail", "", "Reason for failing the Activity (JSON).") - s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for failing the Activity.") - s.WorkflowReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required for workflow Activities. Omit for Standalone Activities.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. For workflow Activities (when --workflow-id is provided), this is the Workflow Run ID. For Standalone Activities, this is the Activity Run ID.") + s.Command.Flags().StringVar(&s.Detail, "detail", "", "Failure detail (JSON). Attached as the failure details payload.") + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Failure reason. Attached as the failure message.") + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityListCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + Query string + Limit int + PageSize int +} + +func NewTemporalActivityListCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityListCommand { + var s TemporalActivityListCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "list [flags]" + s.Command.Short = "List Standalone Activities matching a query (Experimental)" + if hasHighlighting { + s.Command.Long = "List Standalone Activities. Use \x1b[1m--query\x1b[0m to filter results.\n\n\x1b[1mtemporal activity list \\\n --query 'ActivityType=\"YourActivity\"'\x1b[0m\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } else { + s.Command.Long = "List Standalone Activities. Use `--query` to filter results.\n\n```\ntemporal activity list \\\n --query 'ActivityType=\"YourActivity\"'\n```\n\nVisit https://docs.temporal.io/visibility to read more about\nSearch Attributes and queries." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Query to filter the Activity Executions to list.") + s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Maximum number of Activity Executions to display.") + s.Command.Flags().IntVar(&s.PageSize, "page-size", 0, "Maximum number of Activity Executions to fetch at a time from the server.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) @@ -510,9 +735,9 @@ func NewTemporalActivityPauseCommand(cctx *CommandContext, parent *TemporalActiv s.Command.Use = "pause [flags]" s.Command.Short = "Pause an Activity" if hasHighlighting { - s.Command.Long = "Pause an Activity.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run until the next\ntime it fails, completes, or times out, at which point the pause will kick in.\n\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\x1b[0m\n\nTo later unpause the activity, see unpause. You may also want to\nreset the activity to unpause it while also starting it from the beginning." + s.Command.Long = "Pause an Activity. Not supported for Standalone Activities.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run until the next\ntime it fails, completes, or times out, at which point the pause will kick in.\n\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\x1b[0m\n\nTo later unpause the activity, see unpause. You may also want to\nreset the activity to unpause it while also starting it from the beginning." } else { - s.Command.Long = "Pause an Activity.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run until the next\ntime it fails, completes, or times out, at which point the pause will kick in.\n\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n```\n\nTo later unpause the activity, see unpause. You may also want to\nreset the activity to unpause it while also starting it from the beginning." + s.Command.Long = "Pause an Activity. Not supported for Standalone Activities.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run until the next\ntime it fails, completes, or times out, at which point the pause will kick in.\n\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n```\n\nTo later unpause the activity, see unpause. You may also want to\nreset the activity to unpause it while also starting it from the beginning." } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "The Activity ID to pause. Required.") @@ -546,9 +771,9 @@ func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActiv s.Command.Use = "reset [flags]" s.Command.Short = "Reset an Activity" if hasHighlighting { - s.Command.Long = "Reset an activity. This restarts the activity as if it were first being\nscheduled. That is, it will reset both the number of attempts and the\nactivity timeout, as well as, optionally, the\nheartbeat details.\n\nIf the activity may be executing (i.e. it has not yet timed out), the\nreset will take effect the next time it fails, heartbeats, or times out.\nIf is waiting for a retry (i.e. has failed or timed out), the reset\nwill apply immediately.\n\nIf the activity is already paused, it will be unpaused by default.\nYou can specify \x1b[1mkeep_paused\x1b[0m to prevent this.\n\nIf the activity is paused and the \x1b[1mkeep_paused\x1b[0m flag is not provided,\nit will be unpaused. If the activity is paused and \x1b[1mkeep_paused\x1b[0m flag\nis provided - it will stay paused.\n\nEither \x1b[1m--activity-id\x1b[0m (with \x1b[1m--workflow-id\x1b[0m) or \x1b[1m--query\x1b[0m must be specified.\n\n### Resetting activities that heartbeat {#reset-heartbeats}\n\nActivities that heartbeat will receive a Canceled failure\nthe next time they heartbeat after a reset.\n\nIf, in your Activity, you need to do any cleanup when an Activity is\nreset, handle this error and then re-throw it when you've cleaned up.\n\nIf the \x1b[1mreset_heartbeats\x1b[0m flag is set, the heartbeat details will also be cleared.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --keep-paused\n --reset-heartbeats\x1b[0m\n\nActivities can be reset in bulk with a visibility query list filter:\n\n\x1b[1mtemporal activity reset \\\n --query 'WorkflowType=\"YourWorkflow\"'\x1b[0m" + s.Command.Long = "Reset an activity. Not supported for Standalone Activities.\nThis restarts the activity as if it were first being\nscheduled. That is, it will reset both the number of attempts and the\nactivity timeout, as well as, optionally, the\nheartbeat details.\n\nIf the activity may be executing (i.e. it has not yet timed out), the\nreset will take effect the next time it fails, heartbeats, or times out.\nIf is waiting for a retry (i.e. has failed or timed out), the reset\nwill apply immediately.\n\nIf the activity is already paused, it will be unpaused by default.\nYou can specify \x1b[1mkeep_paused\x1b[0m to prevent this.\n\nIf the activity is paused and the \x1b[1mkeep_paused\x1b[0m flag is not provided,\nit will be unpaused. If the activity is paused and \x1b[1mkeep_paused\x1b[0m flag\nis provided - it will stay paused.\n\nEither \x1b[1m--activity-id\x1b[0m (with \x1b[1m--workflow-id\x1b[0m) or \x1b[1m--query\x1b[0m must be specified.\n\n### Resetting activities that heartbeat {#reset-heartbeats}\n\nActivities that heartbeat will receive a Canceled failure\nthe next time they heartbeat after a reset.\n\nIf, in your Activity, you need to do any cleanup when an Activity is\nreset, handle this error and then re-throw it when you've cleaned up.\n\nIf the \x1b[1mreset_heartbeats\x1b[0m flag is set, the heartbeat details will also be cleared.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --keep-paused\n --reset-heartbeats\x1b[0m\n\nActivities can be reset in bulk with a visibility query list filter:\n\n\x1b[1mtemporal activity reset \\\n --query 'WorkflowType=\"YourWorkflow\"'\x1b[0m" } else { - s.Command.Long = "Reset an activity. This restarts the activity as if it were first being\nscheduled. That is, it will reset both the number of attempts and the\nactivity timeout, as well as, optionally, the\nheartbeat details.\n\nIf the activity may be executing (i.e. it has not yet timed out), the\nreset will take effect the next time it fails, heartbeats, or times out.\nIf is waiting for a retry (i.e. has failed or timed out), the reset\nwill apply immediately.\n\nIf the activity is already paused, it will be unpaused by default.\nYou can specify `keep_paused` to prevent this.\n\nIf the activity is paused and the `keep_paused` flag is not provided,\nit will be unpaused. If the activity is paused and `keep_paused` flag\nis provided - it will stay paused.\n\nEither `--activity-id` (with `--workflow-id`) or `--query` must be specified.\n\n### Resetting activities that heartbeat {#reset-heartbeats}\n\nActivities that heartbeat will receive a Canceled failure\nthe next time they heartbeat after a reset.\n\nIf, in your Activity, you need to do any cleanup when an Activity is\nreset, handle this error and then re-throw it when you've cleaned up.\n\nIf the `reset_heartbeats` flag is set, the heartbeat details will also be cleared.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --keep-paused\n --reset-heartbeats\n```\n\nActivities can be reset in bulk with a visibility query list filter:\n\n```\ntemporal activity reset \\\n --query 'WorkflowType=\"YourWorkflow\"'\n```" + s.Command.Long = "Reset an activity. Not supported for Standalone Activities.\nThis restarts the activity as if it were first being\nscheduled. That is, it will reset both the number of attempts and the\nactivity timeout, as well as, optionally, the\nheartbeat details.\n\nIf the activity may be executing (i.e. it has not yet timed out), the\nreset will take effect the next time it fails, heartbeats, or times out.\nIf is waiting for a retry (i.e. has failed or timed out), the reset\nwill apply immediately.\n\nIf the activity is already paused, it will be unpaused by default.\nYou can specify `keep_paused` to prevent this.\n\nIf the activity is paused and the `keep_paused` flag is not provided,\nit will be unpaused. If the activity is paused and `keep_paused` flag\nis provided - it will stay paused.\n\nEither `--activity-id` (with `--workflow-id`) or `--query` must be specified.\n\n### Resetting activities that heartbeat {#reset-heartbeats}\n\nActivities that heartbeat will receive a Canceled failure\nthe next time they heartbeat after a reset.\n\nIf, in your Activity, you need to do any cleanup when an Activity is\nreset, handle this error and then re-throw it when you've cleaned up.\n\nIf the `reset_heartbeats` flag is set, the heartbeat details will also be cleared.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --keep-paused\n --reset-heartbeats\n```\n\nActivities can be reset in bulk with a visibility query list filter:\n\n```\ntemporal activity reset \\\n --query 'WorkflowType=\"YourWorkflow\"'\n```" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "The Activity ID to reset. Mutually exclusive with `--query`. Requires `--workflow-id` to be specified.") @@ -567,6 +792,91 @@ func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActiv return &s } +type TemporalActivityResultCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + ActivityReferenceOptions +} + +func NewTemporalActivityResultCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityResultCommand { + var s TemporalActivityResultCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "result [flags]" + s.Command.Short = "Wait for and output the result of a Standalone Activity (Experimental)" + if hasHighlighting { + s.Command.Long = "Wait for a Standalone Activity to complete and output the\nresult.\n\n\x1b[1mtemporal activity result \\\n --activity-id YourActivityId\x1b[0m" + } else { + s.Command.Long = "Wait for a Standalone Activity to complete and output the\nresult.\n\n```\ntemporal activity result \\\n --activity-id YourActivityId\n```" + } + s.Command.Args = cobra.NoArgs + s.ActivityReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityStartCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + ActivityStartOptions + PayloadInputOptions +} + +func NewTemporalActivityStartCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityStartCommand { + var s TemporalActivityStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "start [flags]" + s.Command.Short = "Start a new Standalone Activity (Experimental)" + if hasHighlighting { + s.Command.Long = "Start a new Standalone Activity. Outputs the Activity ID and\nRun ID.\n\n\x1b[1mtemporal activity start \\\n --activity-id YourActivityId \\\n --type YourActivity \\\n --task-queue YourTaskQueue \\\n --start-to-close-timeout 5m \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + } else { + s.Command.Long = "Start a new Standalone Activity. Outputs the Activity ID and\nRun ID.\n\n```\ntemporal activity start \\\n --activity-id YourActivityId \\\n --type YourActivity \\\n --task-queue YourTaskQueue \\\n --start-to-close-timeout 5m \\\n --input '{\"some-key\": \"some-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.ActivityStartOptions.BuildFlags(s.Command.Flags()) + s.PayloadInputOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + +type TemporalActivityTerminateCommand struct { + Parent *TemporalActivityCommand + Command cobra.Command + ActivityReferenceOptions + Reason string +} + +func NewTemporalActivityTerminateCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityTerminateCommand { + var s TemporalActivityTerminateCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "terminate [flags]" + s.Command.Short = "Forcefully end a Standalone Activity (Experimental)" + if hasHighlighting { + s.Command.Long = "Terminate a Standalone Activity.\n\n\x1b[1mtemporal activity terminate \\\n --activity-id YourActivityId \\\n --reason YourReason\x1b[0m\n\nActivity code cannot see or respond to terminations." + } else { + s.Command.Long = "Terminate a Standalone Activity.\n\n```\ntemporal activity terminate \\\n --activity-id YourActivityId \\\n --reason YourReason\n```\n\nActivity code cannot see or respond to terminations." + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for termination. Defaults to a message with the current user's name.") + s.ActivityReferenceOptions.BuildFlags(s.Command.Flags()) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalActivityUnpauseCommand struct { Parent *TemporalActivityCommand Command cobra.Command @@ -584,9 +894,9 @@ func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalAct s.Command.Use = "unpause [flags]" s.Command.Short = "Unpause an Activity" if hasHighlighting { - s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires.\n\nUse \x1b[1m--reset-attempts\x1b[0m to reset the number of previous run attempts to\nzero. For example, if an Activity is near the maximum number of attempts\nN specified in its retry policy, \x1b[1m--reset-attempts\x1b[0m will allow the\nActivity to be retried another N times after unpausing.\n\nUse \x1b[1m--reset-heartbeat\x1b[0m to reset the Activity's heartbeats.\n\nEither \x1b[1m--activity-id\x1b[0m (with \x1b[1m--workflow-id\x1b[0m) or \x1b[1m--query\x1b[0m must be specified.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\x1b[0m\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n\x1b[1mtemporal activity unpause \\\n --query 'TemporalPauseInfo IS NOT NULL'\x1b[0m" + s.Command.Long = "Re-schedule a previously-paused Activity for execution.\nNot supported for Standalone Activities.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires.\n\nUse \x1b[1m--reset-attempts\x1b[0m to reset the number of previous run attempts to\nzero. For example, if an Activity is near the maximum number of attempts\nN specified in its retry policy, \x1b[1m--reset-attempts\x1b[0m will allow the\nActivity to be retried another N times after unpausing.\n\nUse \x1b[1m--reset-heartbeat\x1b[0m to reset the Activity's heartbeats.\n\nEither \x1b[1m--activity-id\x1b[0m (with \x1b[1m--workflow-id\x1b[0m) or \x1b[1m--query\x1b[0m must be specified.\n\nSpecify the Activity and Workflow IDs:\n\n\x1b[1mtemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\x1b[0m\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n\x1b[1mtemporal activity unpause \\\n --query 'TemporalPauseInfo IS NOT NULL'\x1b[0m" } else { - s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires.\n\nUse `--reset-attempts` to reset the number of previous run attempts to\nzero. For example, if an Activity is near the maximum number of attempts\nN specified in its retry policy, `--reset-attempts` will allow the\nActivity to be retried another N times after unpausing.\n\nUse `--reset-heartbeat` to reset the Activity's heartbeats.\n\nEither `--activity-id` (with `--workflow-id`) or `--query` must be specified.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\n```\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n```\ntemporal activity unpause \\\n --query 'TemporalPauseInfo IS NOT NULL'\n```" + s.Command.Long = "Re-schedule a previously-paused Activity for execution.\nNot supported for Standalone Activities.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires.\n\nUse `--reset-attempts` to reset the number of previous run attempts to\nzero. For example, if an Activity is near the maximum number of attempts\nN specified in its retry policy, `--reset-attempts` will allow the\nActivity to be retried another N times after unpausing.\n\nUse `--reset-heartbeat` to reset the Activity's heartbeats.\n\nEither `--activity-id` (with `--workflow-id`) or `--query` must be specified.\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\n```\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n```\ntemporal activity unpause \\\n --query 'TemporalPauseInfo IS NOT NULL'\n```" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "The Activity ID to unpause. Mutually exclusive with `--query`. Requires `--workflow-id` to be specified.") @@ -625,11 +935,11 @@ func NewTemporalActivityUpdateOptionsCommand(cctx *CommandContext, parent *Tempo s.Parent = parent s.Command.DisableFlagsInUseLine = true s.Command.Use = "update-options [flags]" - s.Command.Short = "Update Activity options" + s.Command.Short = "Change the values of options affecting a running Activity" if hasHighlighting { - s.Command.Long = "Update the options of a running Activity that were passed into it from\na Workflow. Updates are incremental, only changing the specified options.\n\nFor example:\n\n\x1b[1mtemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\x1b[0m\n\nYou may follow this command with \x1b[1mtemporal activity reset\x1b[0m, and the new values will apply after the reset.\n\nEither \x1b[1m--activity-id\x1b[0m or \x1b[1m--query\x1b[0m must be specified.\n\nActivity options can be updated in bulk with a visibility query list filter:\n\n\x1b[1mtemporal activity update-options \\\n --query 'WorkflowType=\"YourWorkflow\"' \\\n --task-queue NewTaskQueueName\x1b[0m" + s.Command.Long = "Update the options of a running Activity that were passed into it from\na Workflow. Updates are incremental, only changing the specified options.\nNot supported for Standalone Activities.\n\nFor example:\n\n\x1b[1mtemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\x1b[0m\n\nYou may follow this command with \x1b[1mtemporal activity reset\x1b[0m, and the new values will apply after the reset.\n\nEither \x1b[1m--activity-id\x1b[0m or \x1b[1m--query\x1b[0m must be specified.\n\nActivity options can be updated in bulk with a visibility query list filter:\n\n\x1b[1mtemporal activity update-options \\\n --query 'WorkflowType=\"YourWorkflow\"' \\\n --task-queue NewTaskQueueName\x1b[0m" } else { - s.Command.Long = "Update the options of a running Activity that were passed into it from\na Workflow. Updates are incremental, only changing the specified options.\n\nFor example:\n\n```\ntemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\n```\n\nYou may follow this command with `temporal activity reset`, and the new values will apply after the reset.\n\nEither `--activity-id` or `--query` must be specified.\n\nActivity options can be updated in bulk with a visibility query list filter:\n\n```\ntemporal activity update-options \\\n --query 'WorkflowType=\"YourWorkflow\"' \\\n --task-queue NewTaskQueueName\n```" + s.Command.Long = "Update the options of a running Activity that were passed into it from\na Workflow. Updates are incremental, only changing the specified options.\nNot supported for Standalone Activities.\n\nFor example:\n\n```\ntemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\n```\n\nYou may follow this command with `temporal activity reset`, and the new values will apply after the reset.\n\nEither `--activity-id` or `--query` must be specified.\n\nActivity options can be updated in bulk with a visibility query list filter:\n\n```\ntemporal activity update-options \\\n --query 'WorkflowType=\"YourWorkflow\"' \\\n --task-queue NewTaskQueueName\n```" } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "The Activity ID to update options. Mutually exclusive with `--query`. Requires `--workflow-id` to be specified.") @@ -2283,9 +2593,9 @@ func NewTemporalTaskQueueDescribeCommand(cctx *CommandContext, parent *TemporalT s.Command.Use = "describe [flags]" s.Command.Short = "Show active Workers" if hasHighlighting { - s.Command.Long = "Display a list of active Workers that have recently polled a Task Queue. The\nTemporal Server records each poll request time. A \x1b[1mLastAccessTime\x1b[0m over one\nminute may indicate the Worker is at capacity or has shut down. Temporal\nWorkers are removed if 5 minutes have passed since the last poll request.\n\n\x1b[1mtemporal task-queue describe \\\n --task-queue YourTaskQueue\x1b[0m\n\nThis command provides poller information for a given Task Queue.\nWorkflow and Activity polling use separate Task Queues:\n\n\x1b[1mtemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --task-queue-type \"activity\"\x1b[0m\n\nThis command provides the following task queue statistics:\n- \x1b[1mApproximateBacklogCount\x1b[0m: The approximate number of tasks backlogged in this\n task queue. May count expired tasks but eventually converges to the right\n value.\n- \x1b[1mApproximateBacklogAge\x1b[0m: Approximate age of the oldest task in the backlog,\n based on its creation time, measured in seconds.\n- \x1b[1mTasksAddRate\x1b[0m: Approximate rate at which tasks are being added to the task\n queue, measured in tasks per second, averaged over the last 30 seconds.\n Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- \x1b[1mTasksDispatchRate\x1b[0m: Approximate rate at which tasks are being dispatched from\n the task queue, measured in tasks per second, averaged over the last 30\n seconds. Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- \x1b[1mBacklogIncreaseRate\x1b[0m: Approximate rate at which the backlog size is\n increasing (if positive) or decreasing (if negative), measured in tasks per\n second, averaged over the last 30 seconds. This is roughly equivalent to:\n \x1b[1mTasksAddRate\x1b[0m - \x1b[1mTasksDispatchRate\x1b[0m.\n\nNOTE: The \x1b[1mTasksAddRate\x1b[0m and \x1b[1mTasksDispatchRate\x1b[0m metrics may differ from the\nactual rate of add/dispatch, because tasks may be dispatched eagerly to an\navailable worker, or may apply only to specific workers (they are \"sticky\").\nSuch tasks are not counted by these metrics. Despite the inaccuracy of\nthese two metrics, the derived metric of \x1b[1mBacklogIncreaseRate\x1b[0m is accurate\nfor backlogs older than a few seconds.\n\nSafely retire Workers assigned a Build ID by checking reachability across\nall task types. Use the flag \x1b[1m--report-reachability\x1b[0m:\n\n\x1b[1mtemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --select-build-id \"YourBuildId\" \\\n --report-reachability\x1b[0m\n\nTask reachability information is returned for the requested versions and all\ntask types, which can be used to safely retire Workers with old code versions,\nprovided that they were assigned a Build ID.\n\nNote that task reachability status is deprecated in favor of Drainage Status\n(ie. of a Drained or Draining Worker Deployment Version) and will be removed \nin a future release. Also, determining task reachability incurs a non-trivial \ncomputing cost.\n\nTask reachability states are reported per build ID. The state may be one of the\nfollowing:\n\n- \x1b[1mReachable\x1b[0m: using the current versioning rules, the Build ID may be used\n by new Workflow Executions or Activities OR there are currently open\n Workflow or backlogged Activity tasks assigned to the queue.\n- \x1b[1mClosedWorkflowsOnly\x1b[0m: the Build ID does not have open Workflow Executions\n and can't be reached by new Workflow Executions. It MAY have closed\n Workflow Executions within the Namespace retention period.\n- \x1b[1mUnreachable\x1b[0m: this Build ID is not used for new Workflow Executions and\n isn't used by any existing Workflow Execution within the retention period.\n\nTask reachability is eventually consistent. You may experience a delay until\nreachability converges to the most accurate value. This is designed to act\nin the most conservative way until convergence. For example, \x1b[1mReachable\x1b[0m is\nmore conservative than \x1b[1mClosedWorkflowsOnly\x1b[0m." + s.Command.Long = "Display a list of active Workers that have recently polled a Task Queue. The\nTemporal Server records each poll request time. A \x1b[1mLastAccessTime\x1b[0m over one\nminute may indicate the Worker is at capacity or has shut down. Temporal\nWorkers are removed if 5 minutes have passed since the last poll request.\n\n\x1b[1mtemporal task-queue describe \\\n --task-queue YourTaskQueue\x1b[0m\n\nThis command provides poller information for a given Task Queue.\nWorkflow and Activity polling use separate Task Queues:\n\n\x1b[1mtemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --task-queue-type \"activity\"\x1b[0m\n\nThis command provides the following task queue statistics:\n- \x1b[1mApproximateBacklogCount\x1b[0m: The approximate number of tasks backlogged in this\n task queue. May count expired tasks but eventually converges to the right\n value.\n- \x1b[1mApproximateBacklogAge\x1b[0m: Approximate age of the oldest task in the backlog,\n based on its creation time, measured in seconds.\n- \x1b[1mTasksAddRate\x1b[0m: Approximate rate at which tasks are being added to the task\n queue, measured in tasks per second, averaged over the last 30 seconds.\n Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- \x1b[1mTasksDispatchRate\x1b[0m: Approximate rate at which tasks are being dispatched from\n the task queue, measured in tasks per second, averaged over the last 30\n seconds. Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- \x1b[1mBacklogIncreaseRate\x1b[0m: Approximate rate at which the backlog size is\n increasing (if positive) or decreasing (if negative), measured in tasks per\n second, averaged over the last 30 seconds. This is roughly equivalent to:\n \x1b[1mTasksAddRate\x1b[0m - \x1b[1mTasksDispatchRate\x1b[0m.\n\nNOTE: The \x1b[1mTasksAddRate\x1b[0m and \x1b[1mTasksDispatchRate\x1b[0m metrics may differ from the\nactual rate of add/dispatch, because tasks may be dispatched eagerly to an\navailable worker, or may apply only to specific workers (they are \"sticky\").\nSuch tasks are not counted by these metrics. Despite the inaccuracy of\nthese two metrics, the derived metric of \x1b[1mBacklogIncreaseRate\x1b[0m is accurate\nfor backlogs older than a few seconds.\n\nSafely retire Workers assigned a Build ID by checking reachability across\nall task types. Use the flag \x1b[1m--report-reachability\x1b[0m:\n\n\x1b[1mtemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --select-build-id \"YourBuildId\" \\\n --report-reachability\x1b[0m\n\nTask reachability information is returned for the requested versions and all\ntask types, which can be used to safely retire Workers with old code versions,\nprovided that they were assigned a Build ID.\n\nNote that task reachability status is deprecated in favor of Drainage Status\n(ie. of a Drained or Draining Worker Deployment Version) and will be removed\nin a future release. Also, determining task reachability incurs a non-trivial\ncomputing cost.\n\nTask reachability states are reported per build ID. The state may be one of the\nfollowing:\n\n- \x1b[1mReachable\x1b[0m: using the current versioning rules, the Build ID may be used\n by new Workflow Executions or Activities OR there are currently open\n Workflow or backlogged Activity tasks assigned to the queue.\n- \x1b[1mClosedWorkflowsOnly\x1b[0m: the Build ID does not have open Workflow Executions\n and can't be reached by new Workflow Executions. It MAY have closed\n Workflow Executions within the Namespace retention period.\n- \x1b[1mUnreachable\x1b[0m: this Build ID is not used for new Workflow Executions and\n isn't used by any existing Workflow Execution within the retention period.\n\nTask reachability is eventually consistent. You may experience a delay until\nreachability converges to the most accurate value. This is designed to act\nin the most conservative way until convergence. For example, \x1b[1mReachable\x1b[0m is\nmore conservative than \x1b[1mClosedWorkflowsOnly\x1b[0m." } else { - s.Command.Long = "Display a list of active Workers that have recently polled a Task Queue. The\nTemporal Server records each poll request time. A `LastAccessTime` over one\nminute may indicate the Worker is at capacity or has shut down. Temporal\nWorkers are removed if 5 minutes have passed since the last poll request.\n\n```\ntemporal task-queue describe \\\n --task-queue YourTaskQueue\n```\n\nThis command provides poller information for a given Task Queue.\nWorkflow and Activity polling use separate Task Queues:\n\n```\ntemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --task-queue-type \"activity\"\n```\n\nThis command provides the following task queue statistics:\n- `ApproximateBacklogCount`: The approximate number of tasks backlogged in this\n task queue. May count expired tasks but eventually converges to the right\n value.\n- `ApproximateBacklogAge`: Approximate age of the oldest task in the backlog,\n based on its creation time, measured in seconds.\n- `TasksAddRate`: Approximate rate at which tasks are being added to the task\n queue, measured in tasks per second, averaged over the last 30 seconds.\n Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- `TasksDispatchRate`: Approximate rate at which tasks are being dispatched from\n the task queue, measured in tasks per second, averaged over the last 30\n seconds. Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- `BacklogIncreaseRate`: Approximate rate at which the backlog size is\n increasing (if positive) or decreasing (if negative), measured in tasks per\n second, averaged over the last 30 seconds. This is roughly equivalent to:\n `TasksAddRate` - `TasksDispatchRate`.\n\nNOTE: The `TasksAddRate` and `TasksDispatchRate` metrics may differ from the\nactual rate of add/dispatch, because tasks may be dispatched eagerly to an\navailable worker, or may apply only to specific workers (they are \"sticky\").\nSuch tasks are not counted by these metrics. Despite the inaccuracy of\nthese two metrics, the derived metric of `BacklogIncreaseRate` is accurate\nfor backlogs older than a few seconds.\n\nSafely retire Workers assigned a Build ID by checking reachability across\nall task types. Use the flag `--report-reachability`:\n\n```\ntemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --select-build-id \"YourBuildId\" \\\n --report-reachability\n```\n\nTask reachability information is returned for the requested versions and all\ntask types, which can be used to safely retire Workers with old code versions,\nprovided that they were assigned a Build ID.\n\nNote that task reachability status is deprecated in favor of Drainage Status\n(ie. of a Drained or Draining Worker Deployment Version) and will be removed \nin a future release. Also, determining task reachability incurs a non-trivial \ncomputing cost.\n\nTask reachability states are reported per build ID. The state may be one of the\nfollowing:\n\n- `Reachable`: using the current versioning rules, the Build ID may be used\n by new Workflow Executions or Activities OR there are currently open\n Workflow or backlogged Activity tasks assigned to the queue.\n- `ClosedWorkflowsOnly`: the Build ID does not have open Workflow Executions\n and can't be reached by new Workflow Executions. It MAY have closed\n Workflow Executions within the Namespace retention period.\n- `Unreachable`: this Build ID is not used for new Workflow Executions and\n isn't used by any existing Workflow Execution within the retention period.\n\nTask reachability is eventually consistent. You may experience a delay until\nreachability converges to the most accurate value. This is designed to act\nin the most conservative way until convergence. For example, `Reachable` is\nmore conservative than `ClosedWorkflowsOnly`." + s.Command.Long = "Display a list of active Workers that have recently polled a Task Queue. The\nTemporal Server records each poll request time. A `LastAccessTime` over one\nminute may indicate the Worker is at capacity or has shut down. Temporal\nWorkers are removed if 5 minutes have passed since the last poll request.\n\n```\ntemporal task-queue describe \\\n --task-queue YourTaskQueue\n```\n\nThis command provides poller information for a given Task Queue.\nWorkflow and Activity polling use separate Task Queues:\n\n```\ntemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --task-queue-type \"activity\"\n```\n\nThis command provides the following task queue statistics:\n- `ApproximateBacklogCount`: The approximate number of tasks backlogged in this\n task queue. May count expired tasks but eventually converges to the right\n value.\n- `ApproximateBacklogAge`: Approximate age of the oldest task in the backlog,\n based on its creation time, measured in seconds.\n- `TasksAddRate`: Approximate rate at which tasks are being added to the task\n queue, measured in tasks per second, averaged over the last 30 seconds.\n Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- `TasksDispatchRate`: Approximate rate at which tasks are being dispatched from\n the task queue, measured in tasks per second, averaged over the last 30\n seconds. Includes tasks dispatched immediately without going to the backlog\n (sync-matched tasks), as well as tasks added to the backlog. (See note below.)\n- `BacklogIncreaseRate`: Approximate rate at which the backlog size is\n increasing (if positive) or decreasing (if negative), measured in tasks per\n second, averaged over the last 30 seconds. This is roughly equivalent to:\n `TasksAddRate` - `TasksDispatchRate`.\n\nNOTE: The `TasksAddRate` and `TasksDispatchRate` metrics may differ from the\nactual rate of add/dispatch, because tasks may be dispatched eagerly to an\navailable worker, or may apply only to specific workers (they are \"sticky\").\nSuch tasks are not counted by these metrics. Despite the inaccuracy of\nthese two metrics, the derived metric of `BacklogIncreaseRate` is accurate\nfor backlogs older than a few seconds.\n\nSafely retire Workers assigned a Build ID by checking reachability across\nall task types. Use the flag `--report-reachability`:\n\n```\ntemporal task-queue describe \\\n --task-queue YourTaskQueue \\\n --select-build-id \"YourBuildId\" \\\n --report-reachability\n```\n\nTask reachability information is returned for the requested versions and all\ntask types, which can be used to safely retire Workers with old code versions,\nprovided that they were assigned a Build ID.\n\nNote that task reachability status is deprecated in favor of Drainage Status\n(ie. of a Drained or Draining Worker Deployment Version) and will be removed\nin a future release. Also, determining task reachability incurs a non-trivial\ncomputing cost.\n\nTask reachability states are reported per build ID. The state may be one of the\nfollowing:\n\n- `Reachable`: using the current versioning rules, the Build ID may be used\n by new Workflow Executions or Activities OR there are currently open\n Workflow or backlogged Activity tasks assigned to the queue.\n- `ClosedWorkflowsOnly`: the Build ID does not have open Workflow Executions\n and can't be reached by new Workflow Executions. It MAY have closed\n Workflow Executions within the Namespace retention period.\n- `Unreachable`: this Build ID is not used for new Workflow Executions and\n isn't used by any existing Workflow Execution within the retention period.\n\nTask reachability is eventually consistent. You may experience a delay until\nreachability converges to the most accurate value. This is designed to act\nin the most conservative way until convergence. For example, `Reachable` is\nmore conservative than `ClosedWorkflowsOnly`." } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.") @@ -3100,9 +3410,9 @@ func NewTemporalWorkerDeploymentManagerIdentityCommand(cctx *CommandContext, par s.Command.Use = "manager-identity" s.Command.Short = "Manager Identity commands change the `ManagerIdentity` of a Worker Deployment" if hasHighlighting { - s.Command.Long = "Manager Identity commands change the \x1b[1mManagerIdentity\x1b[0m of a Worker Deployment:\n\n\x1b[1mtemporal worker deployment manager-identity [command] [options]\x1b[0m\n\nWhen present, \x1b[1mManagerIdentity\x1b[0m is the identity of the user that has the \nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the \x1b[1mManagerIdentity\x1b[0m will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n\x1b[1mManagerIdentity\x1b[0m allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\nThe current Manager Identity is returned with \x1b[1mdescribe\x1b[0m:\n\x1b[1m temporal worker deployment describe \\\n --deployment-name YourDeploymentName\x1b[0m" + s.Command.Long = "Manager Identity commands change the \x1b[1mManagerIdentity\x1b[0m of a Worker Deployment:\n\n\x1b[1mtemporal worker deployment manager-identity [command] [options]\x1b[0m\n\nWhen present, \x1b[1mManagerIdentity\x1b[0m is the identity of the user that has the\nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the \x1b[1mManagerIdentity\x1b[0m will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n\x1b[1mManagerIdentity\x1b[0m allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\nThe current Manager Identity is returned with \x1b[1mdescribe\x1b[0m:\n\x1b[1m temporal worker deployment describe \\\n --deployment-name YourDeploymentName\x1b[0m" } else { - s.Command.Long = "Manager Identity commands change the `ManagerIdentity` of a Worker Deployment:\n\n```\ntemporal worker deployment manager-identity [command] [options]\n```\n\nWhen present, `ManagerIdentity` is the identity of the user that has the \nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the `ManagerIdentity` will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n`ManagerIdentity` allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\nThe current Manager Identity is returned with `describe`:\n```\n temporal worker deployment describe \\\n --deployment-name YourDeploymentName\n```" + s.Command.Long = "Manager Identity commands change the `ManagerIdentity` of a Worker Deployment:\n\n```\ntemporal worker deployment manager-identity [command] [options]\n```\n\nWhen present, `ManagerIdentity` is the identity of the user that has the\nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the `ManagerIdentity` will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n`ManagerIdentity` allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\nThe current Manager Identity is returned with `describe`:\n```\n temporal worker deployment describe \\\n --deployment-name YourDeploymentName\n```" } s.Command.Args = cobra.NoArgs s.Command.AddCommand(&NewTemporalWorkerDeploymentManagerIdentitySetCommand(cctx, &s).Command) @@ -3126,9 +3436,9 @@ func NewTemporalWorkerDeploymentManagerIdentitySetCommand(cctx *CommandContext, s.Command.Use = "set [flags]" s.Command.Short = "Set the Manager Identity of a Worker Deployment" if hasHighlighting { - s.Command.Long = "Set the \x1b[1mManagerIdentity\x1b[0m of a Worker Deployment given its Deployment Name.\n\nWhen present, \x1b[1mManagerIdentity\x1b[0m is the identity of the user that has the \nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the \x1b[1mManagerIdentity\x1b[0m will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n\x1b[1mManagerIdentity\x1b[0m allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n\x1b[1mtemporal worker deployment manager-identity set [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --self \\\n --identity YourUserIdentity # optional, populated by CLI if not provided\x1b[0m\n\nSets the Manager Identity of the Deployment to the identity of the user making \nthis request. If you don't specifically pass an identity field, the CLI will \ngenerate your identity for you.\n\nFor example:\n\x1b[1mtemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --manager-identity NewManagerIdentity\x1b[0m\n\nSets the Manager Identity of the Deployment to any string." + s.Command.Long = "Set the \x1b[1mManagerIdentity\x1b[0m of a Worker Deployment given its Deployment Name.\n\nWhen present, \x1b[1mManagerIdentity\x1b[0m is the identity of the user that has the\nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the \x1b[1mManagerIdentity\x1b[0m will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n\x1b[1mManagerIdentity\x1b[0m allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n\x1b[1mtemporal worker deployment manager-identity set [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --self \\\n --identity YourUserIdentity # optional, populated by CLI if not provided\x1b[0m\n\nSets the Manager Identity of the Deployment to the identity of the user making\nthis request. If you don't specifically pass an identity field, the CLI will\ngenerate your identity for you.\n\nFor example:\n\x1b[1mtemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --manager-identity NewManagerIdentity\x1b[0m\n\nSets the Manager Identity of the Deployment to any string." } else { - s.Command.Long = "Set the `ManagerIdentity` of a Worker Deployment given its Deployment Name.\n\nWhen present, `ManagerIdentity` is the identity of the user that has the \nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the `ManagerIdentity` will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n`ManagerIdentity` allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n```\ntemporal worker deployment manager-identity set [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --self \\\n --identity YourUserIdentity # optional, populated by CLI if not provided\n```\n\nSets the Manager Identity of the Deployment to the identity of the user making \nthis request. If you don't specifically pass an identity field, the CLI will \ngenerate your identity for you.\n\nFor example:\n```\ntemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --manager-identity NewManagerIdentity\n```\n\nSets the Manager Identity of the Deployment to any string." + s.Command.Long = "Set the `ManagerIdentity` of a Worker Deployment given its Deployment Name.\n\nWhen present, `ManagerIdentity` is the identity of the user that has the\nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the `ManagerIdentity` will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n`ManagerIdentity` allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n```\ntemporal worker deployment manager-identity set [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --self \\\n --identity YourUserIdentity # optional, populated by CLI if not provided\n```\n\nSets the Manager Identity of the Deployment to the identity of the user making\nthis request. If you don't specifically pass an identity field, the CLI will\ngenerate your identity for you.\n\nFor example:\n```\ntemporal worker deployment manager-identity set \\\n --deployment-name DeploymentName \\\n --manager-identity NewManagerIdentity\n```\n\nSets the Manager Identity of the Deployment to any string." } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVar(&s.ManagerIdentity, "manager-identity", "", "New Manager Identity. Required unless --self is specified.") @@ -3157,9 +3467,9 @@ func NewTemporalWorkerDeploymentManagerIdentityUnsetCommand(cctx *CommandContext s.Command.Use = "unset [flags]" s.Command.Short = "Unset the Manager Identity of a Worker Deployment" if hasHighlighting { - s.Command.Long = "Unset the \x1b[1mManagerIdentity\x1b[0m of a Worker Deployment given its Deployment Name.\n\nWhen present, \x1b[1mManagerIdentity\x1b[0m is the identity of the user that has the \nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the \x1b[1mManagerIdentity\x1b[0m will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n\x1b[1mManagerIdentity\x1b[0m allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n\x1b[1mtemporal worker deployment manager-identity unset [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment manager-identity unset \\\n --deployment-name YourDeploymentName\x1b[0m\n\nClears the Manager Identity field for a given Deployment." + s.Command.Long = "Unset the \x1b[1mManagerIdentity\x1b[0m of a Worker Deployment given its Deployment Name.\n\nWhen present, \x1b[1mManagerIdentity\x1b[0m is the identity of the user that has the\nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the \x1b[1mManagerIdentity\x1b[0m will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n\x1b[1mManagerIdentity\x1b[0m allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n\x1b[1mtemporal worker deployment manager-identity unset [options]\x1b[0m\n\nFor example:\n\n\x1b[1mtemporal worker deployment manager-identity unset \\\n --deployment-name YourDeploymentName\x1b[0m\n\nClears the Manager Identity field for a given Deployment." } else { - s.Command.Long = "Unset the `ManagerIdentity` of a Worker Deployment given its Deployment Name.\n\nWhen present, `ManagerIdentity` is the identity of the user that has the \nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the `ManagerIdentity` will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n`ManagerIdentity` allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n```\ntemporal worker deployment manager-identity unset [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment manager-identity unset \\\n --deployment-name YourDeploymentName\n```\n\nClears the Manager Identity field for a given Deployment." + s.Command.Long = "Unset the `ManagerIdentity` of a Worker Deployment given its Deployment Name.\n\nWhen present, `ManagerIdentity` is the identity of the user that has the\nexclusive right to make changes to this Worker Deployment. Empty by default.\nWhen set, users whose identity does not match the `ManagerIdentity` will not\nbe able to change the Worker Deployment.\n\nThis is especially useful in environments where multiple users (such as CLI\nusers and automated controllers) may interact with the same Worker Deployment.\n`ManagerIdentity` allows different users to communicate with one another about\nwho is expected to make changes to the Worker Deployment.\n\n```\ntemporal worker deployment manager-identity unset [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment manager-identity unset \\\n --deployment-name YourDeploymentName\n```\n\nClears the Manager Identity field for a given Deployment." } s.Command.Args = cobra.NoArgs s.Command.Flags().StringVar(&s.DeploymentName, "deployment-name", "", "Name for a Worker Deployment. Required.") diff --git a/internal/temporalcli/commands.go b/internal/temporalcli/commands.go index fb76546e2..116de4800 100644 --- a/internal/temporalcli/commands.go +++ b/internal/temporalcli/commands.go @@ -232,6 +232,36 @@ func (c *CommandContext) MarshalFriendlyFailureBodyText(f *failure.Failure, inde return } +type countGroup interface { + GetGroupValues() []*commonpb.Payload + GetCount() int64 +} + +func printCountGroupsText(cctx *CommandContext, groups []countGroup) { + for _, group := range groups { + var valueStr string + for _, payload := range group.GetGroupValues() { + var value any + if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil { + value = fmt.Sprintf("", err) + } + if valueStr != "" { + valueStr += ", " + } + valueStr += fmt.Sprintf("%v", value) + } + cctx.Printer.Printlnf("Group total: %v, values: %v", group.GetCount(), valueStr) + } +} + +func stripCountGroupMetadataType(groups []countGroup) { + for _, group := range groups { + for _, payload := range group.GetGroupValues() { + delete(payload.GetMetadata(), "type") + } + } +} + // Takes payload shorthand into account, can use // MarshalProtoJSONNoPayloadShorthand if needed func (c *CommandContext) MarshalProtoJSON(m proto.Message) ([]byte, error) { diff --git a/internal/temporalcli/commands.workflow_view.go b/internal/temporalcli/commands.workflow_view.go index 278fcad35..2eb749753 100644 --- a/internal/temporalcli/commands.workflow_view.go +++ b/internal/temporalcli/commands.workflow_view.go @@ -15,7 +15,7 @@ import ( "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/temporalnexus" ) @@ -488,35 +488,16 @@ func (c *TemporalWorkflowCountCommand) run(cctx *CommandContext, args []string) return err } - // Just dump response on JSON, otherwise print total and groups + groups := make([]countGroup, len(resp.Groups)) + for i, g := range resp.Groups { + groups[i] = g + } if cctx.JSONOutput { - // Shorthand does not apply to search attributes currently, so we're going - // to remove the "type" from the metadata encoding on group values to make - // it apply - for _, group := range resp.Groups { - for _, payload := range group.GroupValues { - delete(payload.GetMetadata(), "type") - } - } + stripCountGroupMetadataType(groups) return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{}) } - cctx.Printer.Printlnf("Total: %v", resp.Count) - for _, group := range resp.Groups { - // Payload values are search attributes, so we can use the default converter - var valueStr string - for _, payload := range group.GroupValues { - var value any - if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil { - value = fmt.Sprintf("", err) - } - if valueStr != "" { - valueStr += ", " - } - valueStr += fmt.Sprintf("%v", value) - } - cctx.Printer.Printlnf("Group total: %v, values: %v", group.Count, valueStr) - } + printCountGroupsText(cctx, groups) return nil } diff --git a/internal/temporalcli/commands.yaml b/internal/temporalcli/commands.yaml index 87da2a4b4..4f7773c35 100644 --- a/internal/temporalcli/commands.yaml +++ b/internal/temporalcli/commands.yaml @@ -146,36 +146,30 @@ commands: - common - name: temporal activity - summary: Complete, update, pause, unpause, reset or fail an Activity + summary: Operate on Activity Executions description: | - Update an Activity's options, manage activity lifecycle or update - an Activity's state to completed or failed. - - Updating activity state marks an Activity as successfully finished or as - having encountered an error. - - ``` - temporal activity complete \ - --activity-id=YourActivityId \ - --workflow-id=YourWorkflowId \ - --result='{"YourResultKey": "YourResultValue"}' - ``` + Perform operations on Activity Executions. option-sets: - client docs: description-header: >- - Learn how to use Temporal Activity commands for completing or failing - Activity Executions in your Workflow. Optimize your Temporal Workflow - management effectively. + Learn how to use Temporal Activity commands to perform operations on Activity Executions. keywords: - activity + - activity start + - activity execute + - activity describe + - activity list + - activity count + - activity cancel + - activity terminate + - activity execution - activity complete - activity update-options + - activity fail - activity pause - activity unpause - activity reset - - activity execution - - activity fail - cli reference - cli-feature - command-line-interface-cli @@ -184,8 +178,30 @@ commands: - Activities - Temporal CLI + - name: temporal activity cancel + summary: Request cancellation of a Standalone Activity (Experimental) + description: | + Request cancellation of a Standalone Activity. + + ``` + temporal activity cancel \ + --activity-id YourActivityId + ``` + + Requesting cancellation transitions the Activity's run state + to CancelRequested. If the Activity is heartbeating, a + cancellation error will be raised when the next heartbeat + response is received; if the Activity allows this error to + propagate, the Activity transitions to canceled status. + option-sets: + - activity-reference + options: + - name: reason + type: string + description: Reason for cancellation. + - name: temporal activity complete - summary: Complete an Activity + summary: Mark an activity as completed successfully with a result description: | Complete an Activity, marking it as successfully finished. Specify the Activity ID and include a JSON result for the returned value: @@ -200,20 +216,86 @@ commands: - name: activity-id short: a type: string - description: Activity ID to complete. + description: | + Activity ID. This may be the ID of an Activity + invoked by a Workflow, or of a Standalone Activity. required: true + - name: workflow-id + type: string + short: w + description: | + Workflow ID. Required for workflow Activities. + Omit for Standalone Activities. + - name: run-id + type: string + short: r + description: | + Run ID. For workflow Activities (when --workflow-id is + provided), this is the Workflow Run ID. For Standalone + Activities, this is the Activity Run ID. - name: result type: string description: Result `JSON` to return. required: true + + - name: temporal activity count + summary: Count Standalone Activities matching a query (Experimental) + description: | + Return a count of Standalone Activities. Use `--query` to filter + the activities to be counted. + + ``` + temporal activity count \ + --query 'ActivityType="YourActivity"' + ``` + + Visit https://docs.temporal.io/visibility to read more about + Search Attributes and queries. + options: + - name: query + type: string + short: q + description: | + Query to filter Activity Executions to count. + + - name: temporal activity describe + summary: Show detailed info for a Standalone Activity (Experimental) + description: | + Display information about a Standalone Activity. + + ``` + temporal activity describe \ + --activity-id YourActivityId + ``` option-sets: - - workflow-reference + - activity-reference + options: + - name: raw + type: bool + description: Print properties without changing their format. + + - name: temporal activity execute + summary: Start a new Standalone Activity and wait for its result (Experimental) + description: | + Start a new Standalone Activity and block until it completes. + The result is output to stdout. + + ``` + temporal activity execute \ + --activity-id YourActivityId \ + --type YourActivity \ + --task-queue YourTaskQueue \ + --start-to-close-timeout 30s \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + - activity-start + - payload-input - name: temporal activity fail - summary: Fail an Activity + summary: Mark an Activity as completed unsuccessfully with an error description: | - Fail an Activity, marking it as having encountered an error. Specify the - Activity and Workflow IDs: + Fail an Activity, marking it as having encountered an error: ``` temporal activity fail \ @@ -224,22 +306,67 @@ commands: - name: activity-id short: a type: string - description: Activity ID to fail. + description: | + Activity ID. This may be the ID of an Activity + invoked by a Workflow, or of a Standalone Activity. required: true + - name: workflow-id + type: string + short: w + description: | + Workflow ID. Required for workflow Activities. + Omit for Standalone Activities. + - name: run-id + type: string + short: r + description: | + Run ID. For workflow Activities (when --workflow-id is + provided), this is the Workflow Run ID. For Standalone + Activities, this is the Activity Run ID. - name: detail type: string - description: Reason for failing the Activity (JSON). + description: | + Failure detail (JSON). Attached as the failure details + payload. - name: reason type: string - description: Reason for failing the Activity. - option-sets: - - workflow-reference + description: | + Failure reason. Attached as the failure message. + + - name: temporal activity list + summary: List Standalone Activities matching a query (Experimental) + description: | + List Standalone Activities. Use `--query` to filter results. + + ``` + temporal activity list \ + --query 'ActivityType="YourActivity"' + ``` + + Visit https://docs.temporal.io/visibility to read more about + Search Attributes and queries. + options: + - name: query + short: q + type: string + description: | + Query to filter the Activity Executions to list. + - name: limit + type: int + description: | + Maximum number of Activity Executions to display. + - name: page-size + type: int + description: | + Maximum number of Activity Executions to fetch + at a time from the server. - name: temporal activity update-options - summary: Update Activity options + summary: Change the values of options affecting a running Activity description: | Update the options of a running Activity that were passed into it from a Workflow. Updates are incremental, only changing the specified options. + Not supported for Standalone Activities. For example: @@ -336,7 +463,7 @@ commands: - name: temporal activity pause summary: Pause an Activity description: | - Pause an Activity. + Pause an Activity. Not supported for Standalone Activities. If the Activity is not currently running (e.g. because it previously failed), it will not be run again until it is unpaused. @@ -375,6 +502,7 @@ commands: summary: Unpause an Activity description: | Re-schedule a previously-paused Activity for execution. + Not supported for Standalone Activities. If the Activity is not running and is past its retry timeout, it will be scheduled immediately. Otherwise, it will be scheduled after its retry @@ -428,7 +556,8 @@ commands: - name: temporal activity reset summary: Reset an Activity description: | - Reset an activity. This restarts the activity as if it were first being + Reset an activity. Not supported for Standalone Activities. + This restarts the activity as if it were first being scheduled. That is, it will reset both the number of attempts and the activity timeout, as well as, optionally, the [heartbeat details](#reset-heartbeats). @@ -499,6 +628,58 @@ commands: option-sets: - single-activity-or-batch + - name: temporal activity result + summary: Wait for and output the result of a Standalone Activity (Experimental) + description: | + Wait for a Standalone Activity to complete and output the + result. + + ``` + temporal activity result \ + --activity-id YourActivityId + ``` + option-sets: + - activity-reference + + - name: temporal activity start + summary: Start a new Standalone Activity (Experimental) + description: | + Start a new Standalone Activity. Outputs the Activity ID and + Run ID. + + ``` + temporal activity start \ + --activity-id YourActivityId \ + --type YourActivity \ + --task-queue YourTaskQueue \ + --start-to-close-timeout 5m \ + --input '{"some-key": "some-value"}' + ``` + option-sets: + - activity-start + - payload-input + + - name: temporal activity terminate + summary: Forcefully end a Standalone Activity (Experimental) + description: | + Terminate a Standalone Activity. + + ``` + temporal activity terminate \ + --activity-id YourActivityId \ + --reason YourReason + ``` + + Activity code cannot see or respond to terminations. + option-sets: + - activity-reference + options: + - name: reason + type: string + description: | + Reason for termination. + Defaults to a message with the current user's name. + - name: temporal batch summary: Manage running batch jobs description: | @@ -1146,17 +1327,17 @@ commands: ``` temporal worker deployment manager-identity [command] [options] ``` - - When present, `ManagerIdentity` is the identity of the user that has the + + When present, `ManagerIdentity` is the identity of the user that has the exclusive right to make changes to this Worker Deployment. Empty by default. When set, users whose identity does not match the `ManagerIdentity` will not be able to change the Worker Deployment. - + This is especially useful in environments where multiple users (such as CLI users and automated controllers) may interact with the same Worker Deployment. `ManagerIdentity` allows different users to communicate with one another about who is expected to make changes to the Worker Deployment. - + The current Manager Identity is returned with `describe`: ``` temporal worker deployment describe \ @@ -1175,12 +1356,12 @@ commands: summary: Set the Manager Identity of a Worker Deployment description: | Set the `ManagerIdentity` of a Worker Deployment given its Deployment Name. - - When present, `ManagerIdentity` is the identity of the user that has the + + When present, `ManagerIdentity` is the identity of the user that has the exclusive right to make changes to this Worker Deployment. Empty by default. When set, users whose identity does not match the `ManagerIdentity` will not be able to change the Worker Deployment. - + This is especially useful in environments where multiple users (such as CLI users and automated controllers) may interact with the same Worker Deployment. `ManagerIdentity` allows different users to communicate with one another about @@ -1189,7 +1370,7 @@ commands: ``` temporal worker deployment manager-identity set [options] ``` - + For example: ``` @@ -1199,17 +1380,17 @@ commands: --identity YourUserIdentity # optional, populated by CLI if not provided ``` - Sets the Manager Identity of the Deployment to the identity of the user making - this request. If you don't specifically pass an identity field, the CLI will + Sets the Manager Identity of the Deployment to the identity of the user making + this request. If you don't specifically pass an identity field, the CLI will generate your identity for you. - + For example: ``` temporal worker deployment manager-identity set \ --deployment-name DeploymentName \ --manager-identity NewManagerIdentity ``` - + Sets the Manager Identity of the Deployment to any string. options: @@ -1231,12 +1412,12 @@ commands: summary: Unset the Manager Identity of a Worker Deployment description: | Unset the `ManagerIdentity` of a Worker Deployment given its Deployment Name. - - When present, `ManagerIdentity` is the identity of the user that has the + + When present, `ManagerIdentity` is the identity of the user that has the exclusive right to make changes to this Worker Deployment. Empty by default. When set, users whose identity does not match the `ManagerIdentity` will not be able to change the Worker Deployment. - + This is especially useful in environments where multiple users (such as CLI users and automated controllers) may interact with the same Worker Deployment. `ManagerIdentity` allows different users to communicate with one another about @@ -1245,7 +1426,7 @@ commands: ``` temporal worker deployment manager-identity unset [options] ``` - + For example: ``` @@ -1268,7 +1449,7 @@ commands: summary: List worker status information in a specific namespace (EXPERIMENTAL) description: | Get a list of workers to the specified namespace. - + ``` temporal worker list --namespace YourNamespace --query 'TaskQueue="YourTaskQueue"' ``` @@ -1285,7 +1466,7 @@ commands: summary: Returns information about a specific worker (EXPERIMENTAL) description: | Look up information of a specific worker. - + ``` temporal worker describe --namespace YourNamespace --worker-instance-key YourKey ``` @@ -2580,8 +2761,8 @@ commands: provided that they were assigned a Build ID. Note that task reachability status is deprecated in favor of Drainage Status - (ie. of a Drained or Draining Worker Deployment Version) and will be removed - in a future release. Also, determining task reachability incurs a non-trivial + (ie. of a Drained or Draining Worker Deployment Version) and will be removed + in a future release. Also, determining task reachability incurs a non-trivial computing cost. Task reachability states are reported per build ID. The state may be one of the @@ -4833,7 +5014,7 @@ option-sets: Temporal workflow headers in 'KEY=VALUE' format. Keys must be identifiers, and values must be JSON values. May be passed multiple times to set multiple Temporal headers. - Note: These are workflow headers, not gRPC headers. + Note: These are workflow headers, not gRPC headers. - name: workflow-update-options options: @@ -4855,3 +5036,139 @@ option-sets: description: When overriding to a `pinned` behavior, specifies the Build ID of the version to target. + + - name: activity-reference + options: + - name: activity-id + type: string + short: a + description: Activity ID. + required: true + - name: run-id + type: string + short: r + description: | + Activity Run ID. + If not set, targets the latest run. + + - name: activity-start + options: + - name: activity-id + type: string + short: a + description: Activity ID. + required: true + - name: type + type: string + description: Activity Type name. + required: true + - name: task-queue + type: string + description: Activity task queue. + required: true + short: t + - name: schedule-to-close-timeout + type: duration + description: | + Maximum time for the Activity Execution, including + all retries. Either this or "start-to-close-timeout" + is required. + - name: schedule-to-start-timeout + type: duration + description: | + Maximum time an Activity task can stay in a task + queue before a Worker picks it up. On expiry it + results in a non-retryable failure and no further + attempts are scheduled. + - name: start-to-close-timeout + type: duration + description: | + Maximum time for a single Activity attempt. + On expiry a new attempt may be scheduled if permitted + by the retry policy and schedule-to-close timeout. + Either this or "schedule-to-close-timeout" + is required. + - name: heartbeat-timeout + type: duration + description: | + Maximum time between successful Worker heartbeats. + On expiry the current activity attempt fails. + - name: retry-initial-interval + type: duration + description: | + Interval of the first retry. + If "retry-backoff-coefficient" is 1.0, it is used + for all retries. + - name: retry-maximum-interval + type: duration + description: | + Maximum interval between retries. + - name: retry-backoff-coefficient + type: float + description: | + Coefficient for calculating the next retry interval. + Must be 1 or larger. + - name: retry-maximum-attempts + type: int + description: | + Maximum number of attempts. + Setting to 1 disables retries. + Setting to 0 means unlimited attempts. + - name: id-reuse-policy + type: string-enum + description: | + Policy for handling activity start when an Activity + with the same ID exists and has completed. + enum-values: + - AllowDuplicate + - AllowDuplicateFailedOnly + - RejectDuplicate + - name: id-conflict-policy + type: string-enum + description: | + Policy for handling activity start when an + Activity with the same ID is currently running. + enum-values: + - Fail + - UseExisting + - name: search-attribute + type: string[] + description: | + Search Attribute in `KEY=VALUE` format. + Keys must be identifiers, and values must be + JSON values. + Can be passed multiple times. + See https://docs.temporal.io/visibility. + - name: headers + type: string[] + description: | + Temporal activity headers in 'KEY=VALUE' format. + Keys must be identifiers, and values must be + JSON values. + May be passed multiple times. + - name: static-summary + type: string + experimental: true + description: | + Static Activity summary for human consumption in UIs. + Uses standard Markdown formatting excluding images, HTML, and script tags. + - name: static-details + type: string + experimental: true + description: | + Static Activity details for human consumption in UIs. + Uses standard Markdown formatting excluding images, HTML, and script tags. + - name: priority-key + type: int + description: | + Priority key (1-5, lower = higher priority). + Default is 3 when not specified. + - name: fairness-key + type: string + description: | + Fairness key (max 64 bytes) for proportional task + dispatch. + - name: fairness-weight + type: float + description: | + Weight [0.001-1000] for this fairness key. diff --git a/internal/temporalcli/commands_test.go b/internal/temporalcli/commands_test.go index d37c890b9..e1394e35c 100644 --- a/internal/temporalcli/commands_test.go +++ b/internal/temporalcli/commands_test.go @@ -233,6 +233,9 @@ func (s *SharedServerSuite) SetupSuite() { "frontend.namespaceRPS.visibility": 10000, // Disable DescribeTaskQueue cache. "frontend.activityAPIsEnabled": true, + "history.enableChasm": true, + "activity.enableStandalone": true, + "activity.longPollTimeout": 2 * time.Second, // this is overridden since we don't want caching to be enabled // while testing DescribeTaskQueue behaviour related to versioning "matching.TaskQueueInfoByBuildIdTTL": 0 * time.Second,