Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3773,6 +3773,7 @@ type TemporalWorkflowShowCommand struct {
WorkflowReferenceOptions
Follow bool
Detailed bool
Reverse bool
}

func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowShowCommand {
Expand All @@ -3789,6 +3790,7 @@ func NewTemporalWorkflowShowCommand(cctx *CommandContext, parent *TemporalWorkfl
s.Command.Args = cobra.NoArgs
s.Command.Flags().BoolVarP(&s.Follow, "follow", "f", false, "Follow the Workflow Execution progress in real time. Does not apply to JSON output.")
s.Command.Flags().BoolVar(&s.Detailed, "detailed", false, "Display events as detailed sections instead of table. Does not apply to JSON output.")
s.Command.Flags().BoolVar(&s.Reverse, "reverse", false, "Fetch Event History newest-event-first. Cannot be combined with --follow.")
s.WorkflowReferenceOptions.BuildFlags(s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
Expand Down
56 changes: 51 additions & 5 deletions internal/temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,16 +707,24 @@ func coloredEventType(e enums.EventType) string {
type structuredHistoryIter struct {
ctx context.Context
client client.Client
namespace string
workflowID string
runID string
includeDetails bool
// If set true, long poll the history for updates
follow bool
// If set true, fetch history newest-event-first via GetWorkflowExecutionHistoryReverse
reverse bool
// If and when the iterator encounters a workflow-terminating event, it will store it here
wfResult *history.HistoryEvent

// Internal
// Internal (forward)
iter client.HistoryEventIterator

// Internal (reverse)
reverseBuf []*history.HistoryEvent
reverseNextToken []byte
reverseStarted bool
}

func (s *structuredHistoryIter) print(cctx *CommandContext) error {
Expand Down Expand Up @@ -784,15 +792,20 @@ func (s *structuredHistoryIter) Next() (any, error) {
Type: coloredEventType(event.EventType),
}

// Follow continue as new
if attr := event.GetWorkflowExecutionContinuedAsNewEventAttributes(); attr != nil {
s.runID = attr.NewExecutionRunId
s.iter = nil
// Follow continue as new (forward only; reverse traversal stays within the requested run)
if !s.reverse {
if attr := event.GetWorkflowExecutionContinuedAsNewEventAttributes(); attr != nil {
s.runID = attr.NewExecutionRunId
s.iter = nil
}
}
return data, nil
}

func (s *structuredHistoryIter) NextRawEvent() (*history.HistoryEvent, error) {
if s.reverse {
return s.nextRawEventReverse()
}
// Load iter
if s.iter == nil {
s.iter = s.client.GetWorkflowHistory(
Expand All @@ -811,6 +824,39 @@ func (s *structuredHistoryIter) NextRawEvent() (*history.HistoryEvent, error) {
return event, nil
}

func (s *structuredHistoryIter) nextRawEventReverse() (*history.HistoryEvent, error) {
for len(s.reverseBuf) == 0 {
Comment thread
davidporter-id-au marked this conversation as resolved.
if s.reverseStarted && len(s.reverseNextToken) == 0 {
return nil, nil
}
s.reverseStarted = true
resp, err := s.client.WorkflowService().GetWorkflowExecutionHistoryReverse(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default page size. For large history would be nice to allow customers to provide a custom size.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the max page size configured on the server side is 256 (batches) I think.

s.ctx,
&workflowservice.GetWorkflowExecutionHistoryReverseRequest{
Namespace: s.namespace,
Execution: &common.WorkflowExecution{
WorkflowId: s.workflowID,
RunId: s.runID,
},
NextPageToken: s.reverseNextToken,
},
)
if err != nil {
return nil, err
}
s.reverseNextToken = resp.GetNextPageToken()
if h := resp.GetHistory(); h != nil {
s.reverseBuf = h.GetEvents()
}
}
event := s.reverseBuf[0]
s.reverseBuf = s.reverseBuf[1:]
if isWorkflowTerminatingEvent(event.EventType) {
s.wfResult = event
}
return event, nil
}

type eventFieldValue struct {
field string
value string
Expand Down
6 changes: 6 additions & 0 deletions internal/temporalcli/commands.workflow_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,10 @@ func (c *TemporalWorkflowResultCommand) run(cctx *CommandContext, _ []string) er
}

func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) error {
if c.Reverse && c.Follow {
return fmt.Errorf("--reverse cannot be combined with --follow")
}

// Call describe
cl, err := dialClient(cctx, &c.Parent.ClientOptions)
if err != nil {
Expand All @@ -583,10 +587,12 @@ func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, _ []string) erro
iter := &structuredHistoryIter{
ctx: cctx,
client: cl,
namespace: c.Parent.Namespace,
workflowID: c.WorkflowId,
runID: c.RunId,
includeDetails: c.Detailed,
follow: c.Follow,
reverse: c.Reverse,
}
if !cctx.JSONOutput {
cctx.Printer.Println(color.MagentaString("Progress:"))
Expand Down
87 changes: 87 additions & 0 deletions internal/temporalcli/commands.workflow_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,93 @@ func (s *SharedServerSuite) TestWorkflow_Show_JSON() {
s.NotContains(out, "Results:")
}

func (s *SharedServerSuite) TestWorkflow_Show_Reverse() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
return "hi!", nil
})

run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue},
DevWorkflow,
"ignored",
)
s.NoError(err)
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"--reverse",
)
s.NoError(res.Err)
out := res.Stdout.String()
completedIdx := strings.Index(out, "WorkflowExecutionCompleted")
startedIdx := strings.Index(out, "WorkflowExecutionStarted")
s.Greater(completedIdx, -1, "output should include the completed event")
s.Greater(startedIdx, -1, "output should include the started event")
s.Less(completedIdx, startedIdx, "completed event should appear before started event in reverse order")
}

func (s *SharedServerSuite) TestWorkflow_Show_Reverse_JSON() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.GetSignalChannel(ctx, "my-signal").Receive(ctx, nil)
return "hi!", nil
})

run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue},
DevWorkflow,
"workflow-param",
)
s.NoError(err)
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), "", "my-signal", nil))
s.NoError(run.Get(s.Context, nil))

res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", run.GetID(),
"--reverse",
"-o", "json",
)
s.NoError(res.Err)
out := res.Stdout.String()

var parsed struct {
Events []struct {
EventId string `json:"eventId"`
} `json:"events"`
}
s.NoError(json.Unmarshal([]byte(out), &parsed))
s.GreaterOrEqual(len(parsed.Events), 2)
prev := int64(-1)
for i, e := range parsed.Events {
id, err := strconv.ParseInt(e.EventId, 10, 64)
s.NoError(err)
if i > 0 {
s.Less(id, prev, "event %d (id=%d) should have smaller eventId than previous (%d) in reverse order", i, id, prev)
}
prev = id
}
}

func (s *SharedServerSuite) TestWorkflow_Show_Reverse_RejectsFollow() {
res := s.Execute(
"workflow", "show",
"--address", s.Address(),
"-w", "does-not-matter",
"--reverse",
"--follow",
)
s.Error(res.Err)
s.ErrorContains(res.Err, "--reverse cannot be combined with --follow")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something for this PR, but it would be nice to have a way to generate MarkFlagsMutuallyExclusive in the yaml file below.

Seems like this is not currently supported for auto-gen but something the under lying framework supports.

}

func (s *SharedServerSuite) TestWorkflow_List() {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
return a, nil
Expand Down
4 changes: 4 additions & 0 deletions internal/temporalcli/commands.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3759,6 +3759,10 @@ commands:
description: |
Display events as detailed sections instead of table.
Does not apply to JSON output.
- name: reverse
type: bool
description: |
Fetch Event History newest-event-first. Cannot be combined with --follow.
option-sets:
- workflow-reference

Expand Down
Loading