Commit 55df8b9
Eric Bower
·
2026-05-06 23:44:47 -0400 EDT
parent 642d214
feat(runner): --wait command for debugging
2 files changed,
+219,
-19
M
main.go
+1,
-0
1@@ -1 +1,2 @@
2 ci
3+pici
M
main.go
+218,
-19
1@@ -12,8 +12,10 @@ import (
2 "log/slog"
3 "os"
4 "os/exec"
5+ "os/signal"
6 "path/filepath"
7 "strings"
8+ "syscall"
9 "time"
10
11 "github.com/golang-cz/devslog"
12@@ -43,6 +45,7 @@ type Cfg struct {
13 StatusOutput io.Writer // where status JSONL is written (default: os.Stdout)
14 StatusFilter string // "terminal" (default) or "all"
15 HumanOutput bool // human-readable output instead of JSONL / slog
16+ Wait bool // block until job completes, print history and summary
17 }
18
19 type Event struct {
20@@ -53,7 +56,7 @@ type Event struct {
21 ArtifactURL string `json:"artifact_url"` // public URL for artifacts (e.g. https://{user}-artifacts.pgs.sh/{repo})
22 }
23
24-func NewCfg() *Cfg {
25+func NewCfg() (*Cfg, string, bool) {
26 var keyLoc, certLoc, artifactDir, event string
27 var monitorInterval time.Duration
28 var logLevel string
29@@ -67,9 +70,15 @@ func NewCfg() *Cfg {
30 flag.BoolVar(&structured, "structured", false, "use structured key=value log output")
31 var statusFilter string
32 var human bool
33+ var wait bool
34 flag.StringVar(&statusFilter, "status-filter", "terminal", "status output filter: terminal (default) or all")
35 flag.BoolVar(&human, "human", false, "human-readable output (default: JSONL / slog)")
36- flag.Parse()
37+ flag.BoolVar(&wait, "wait", false, "block until job completes, printing session history and summary")
38+
39+ // Split args so the subcommand (first non-flag arg) doesn't block
40+ // flags that appear after it: "pici runner --wait" works.
41+ flags, cmd, wantHelp := splitCommand(os.Args[1:])
42+ flag.CommandLine.Parse(flags)
43
44 logger := newLogger("ci", logLevel, structured)
45 ctx, cancel := context.WithCancel(context.Background())
46@@ -85,7 +94,27 @@ func NewCfg() *Cfg {
47 MonitorInterval: monitorInterval,
48 StatusFilter: statusFilter,
49 HumanOutput: human,
50+ Wait: wait,
51+ }, cmd, wantHelp
52+}
53+
54+// splitCommand separates the first non-flag argument (the subcommand) from
55+// the rest of the flags, so "runner --wait" becomes flags=["--wait"], cmd="runner".
56+// It strips --help/help so we can print custom help per subcommand.
57+func splitCommand(args []string) (flags []string, cmd string, wantHelp bool) {
58+ flags = make([]string, 0, len(args))
59+ for _, arg := range args {
60+ if arg == "--help" || arg == "help" {
61+ wantHelp = true
62+ continue
63+ }
64+ if cmd == "" && arg != "" && !strings.HasPrefix(arg, "-") {
65+ cmd = arg
66+ } else {
67+ flags = append(flags, arg)
68+ }
69 }
70+ return flags, cmd, wantHelp
71 }
72
73 func parseLogLevel(s string) slog.Level {
74@@ -116,18 +145,22 @@ func newLogger(space string, levelStr string, structured bool) *slog.Logger {
75 }
76
77 func main() {
78- cfg := NewCfg()
79- cmd := flag.Arg(0)
80+ cfg, cmd, wantHelp := NewCfg()
81
82 cfg.Logger.Debug("setting up ci", "cfg", cfg)
83 cfg.Logger.Debug("running cmd", "cmd", cmd)
84
85- switch cmd {
86- case "runner":
87- if flag.Arg(1) == "--help" || flag.Arg(1) == "help" {
88+ if wantHelp && (cmd == "runner" || cmd == "monitor") {
89+ if cmd == "runner" {
90 printRunnerHelp()
91- return
92+ } else {
93+ printMonitorHelp()
94 }
95+ return
96+ }
97+
98+ switch cmd {
99+ case "runner":
100 cfg.Logger.Debug("starting runner")
101 if err := RunRunner(cfg); err != nil {
102 cfg.Logger.Error("runner failed", "err", err)
103@@ -146,10 +179,6 @@ func main() {
104 os.Exit(1)
105 }
106 case "monitor":
107- if flag.Arg(1) == "--help" || flag.Arg(1) == "help" {
108- printMonitorHelp()
109- return
110- }
111 cfg.Logger.Debug("starting monitor")
112 if err := runMonitor(cfg); err != nil {
113 cfg.Logger.Error("monitor failed", "err", err)
114@@ -198,6 +227,7 @@ func printRunnerHelp() {
115 USAGE
116 echo '<event-json>' | pici runner [flags]
117 pici runner --event '<event-json>' [flags]
118+ echo '<event-json>' | pici runner --wait [flags] # block until done
119
120 EVENT JSON FIELDS
121 type (required) Event type, e.g. "push", "merge_request"
122@@ -215,7 +245,8 @@ FLAGS
123 -event <json> Event JSON string (alternative to reading from stdin)
124 -artifact-dir <path> Local directory to stage artifacts (default: /tmp/pici-artifacts)
125 -log-level <level> Log level: debug, info, warn, error (default: info)
126- -human Human-readable output (default: enabled for runner)`)
127+ -human Human-readable output (default: enabled for runner)
128+ -wait Block until job completes, printing session history and summary`)
129
130 }
131
132@@ -354,21 +385,22 @@ func (eng *JobEngine) Run() error {
133 log.Debug("starting runner session", "session", prefix+"runner")
134
135 evStr := fmt.Sprintf("PICO_CI_EVENT=%s", eng.Ev.Type)
136- jobStr := fmt.Sprintf("ZMX_SESSION_PREFIX=%s", childPrefix)
137
138 runnerName := prefix + "runner"
139- // Name the runner explicitly and don't set ZMX_SESSION_PREFIX for this
140- // outer zmx run call. The prefix is only for child sessions spawned
141- // inside pico.sh (via the exported env var).
142+ // Name the runner explicitly. Do NOT set ZMX_SESSION_PREFIX for this
143+ // outer zmx run call ā it would be prepended to the runner's own name.
144+ // The prefix is only for child sessions spawned inside pico.sh (via the
145+ // exported env var).
146 cmdEnv := make([]string, 0, len(os.Environ())+2)
147 for _, e := range os.Environ() {
148 if !strings.HasPrefix(e, "ZMX_SESSION_PREFIX=") {
149 cmdEnv = append(cmdEnv, e)
150 }
151 }
152- cmdEnv = append(cmdEnv, evStr, jobStr)
153+ cmdEnv = append(cmdEnv, evStr)
154
155- cmd := exec.Command("zmx", "run", runnerName, "-d", "bash", manifest)
156+ zmxPrefixStr := fmt.Sprintf("ZMX_SESSION_PREFIX=%s", childPrefix)
157+ cmd := exec.Command("zmx", "run", runnerName, "-d", zmxPrefixStr, "bash", manifest)
158 cmd.Env = cmdEnv
159 cmd.Dir = eng.Wk.GetDir()
160
161@@ -441,10 +473,177 @@ func eventHandler(cfg *Cfg, eventData *Event) error {
162 }
163
164 fmt.Fprintln(os.Stdout, "ā
job launched")
165+
166+ if cfg.Wait {
167+ if err := waitAndReport(cfg, log, eventData.Name, jobID); err != nil {
168+ return fmt.Errorf("wait: %w", err)
169+ }
170+ return nil
171+ }
172+
173 fmt.Fprintf(os.Stdout, " follow: zmx tail ci.%s.%s.runner\n", eventData.Name, jobID)
174 return nil
175 }
176
177+// waitAndReport polls the job's sessions until all complete, prints live
178+// progress to stdout, then dumps session history and a final summary.
179+func waitAndReport(cfg *Cfg, log *slog.Logger, name, jobID string) error {
180+ prefix := "ci." + name + "." + jobID + "."
181+ ticker := time.NewTicker(cfg.MonitorInterval)
182+ defer ticker.Stop()
183+
184+ // Handle ^C gracefully
185+ sigCh := make(chan os.Signal, 1)
186+ signal.Notify(sigCh, syscall.SIGINT)
187+ defer signal.Stop(sigCh)
188+
189+ fmt.Fprintln(os.Stdout)
190+ fmt.Fprint(os.Stdout, "ā³ waiting for completion...\n")
191+
192+ // Track what we've already printed to avoid duplicates
193+ type sessionState struct {
194+ status string // "running", "success", "failed"
195+ exitCode string
196+ duration string
197+ printed bool
198+ }
199+ known := make(map[string]*sessionState)
200+ var jobSessions []SessionInfo
201+ var sessionOrder []string // track insertion order for deterministic output
202+
203+ for {
204+ select {
205+ case <-cfg.Ctx.Done():
206+ return cfg.Ctx.Err()
207+ case <-sigCh:
208+ fmt.Fprintln(os.Stdout, "\nā¹ cancelled")
209+ return nil
210+ case <-ticker.C:
211+ }
212+
213+ // Fetch current session list
214+ listOutput, err := exec.Command("zmx", "list").CombinedOutput()
215+ if err != nil {
216+ log.Error("zmx list", "err", err)
217+ continue
218+ }
219+
220+ sessions := parseZMXList(string(listOutput))
221+ jobSessions = nil
222+ for _, s := range sessions {
223+ if strings.HasPrefix(s.Name, prefix) {
224+ s.Short = cleanSessionShort(s.Name, prefix, name, jobID)
225+ jobSessions = append(jobSessions, s)
226+ }
227+ }
228+
229+ if len(jobSessions) == 0 {
230+ continue // runner session may not have appeared yet
231+ }
232+
233+ // Update state for each session
234+ for _, s := range jobSessions {
235+ state, ok := known[s.Short]
236+ if !ok {
237+ state = &sessionState{}
238+ known[s.Short] = state
239+ sessionOrder = append(sessionOrder, s.Short)
240+ }
241+
242+ if s.Ended == "" {
243+ state.status = "running"
244+ } else {
245+ if s.ExitCode == "0" {
246+ state.status = "success"
247+ } else {
248+ state.status = "failed"
249+ state.exitCode = s.ExitCode
250+ }
251+ state.duration = fmtDuration(s.Created, s.Ended)
252+ }
253+
254+ // Print when state changes
255+ if !state.printed {
256+ icon := map[string]string{"running": "š", "success": "ā
", "failed": "ā"}[state.status]
257+ detail := ""
258+ if state.status == "failed" && state.exitCode != "" {
259+ detail = fmt.Sprintf(", exit %s", state.exitCode)
260+ }
261+ if state.duration != "" && state.duration != "ā" {
262+ detail += fmt.Sprintf(" (%s)", state.duration)
263+ }
264+ fmt.Fprintf(os.Stdout, " %-12s %s %s%s\n", s.Short, icon, state.status, detail)
265+ state.printed = true
266+ }
267+ }
268+
269+ // Check if all sessions are done
270+ if allCompleted(jobSessions) {
271+ break
272+ }
273+ }
274+
275+ // Print last 25 lines of history for failed sessions only
276+ fmt.Fprintln(os.Stdout)
277+ for _, short := range sessionOrder {
278+ state := known[short]
279+ if state.status != "failed" {
280+ continue
281+ }
282+
283+ full := prefix + short
284+
285+ separator := strings.Repeat("\u2500", 50)
286+ fmt.Fprintln(os.Stdout, separator)
287+ fmt.Fprintf(os.Stdout, "Session: %s (exit %s)\n", short, state.exitCode)
288+ fmt.Fprintln(os.Stdout, separator)
289+ fmt.Fprintln(os.Stdout)
290+
291+ history, err := fetchHistoryPlain(full)
292+ if err != nil {
293+ fmt.Fprintf(os.Stdout, " (history unavailable: %v)\n", err)
294+ } else {
295+ lines := strings.Split(history, "\n")
296+ // Show last 25 lines
297+ if len(lines) > 25 {
298+ fmt.Fprintf(os.Stdout, " ... (%d lines omitted)\n", len(lines)-25)
299+ lines = lines[len(lines)-25:]
300+ }
301+ for _, line := range lines {
302+ if line != "" {
303+ fmt.Fprintf(os.Stdout, " %s\n", line)
304+ }
305+ }
306+ }
307+ fmt.Fprintln(os.Stdout)
308+ }
309+
310+ // Final summary
311+ exitCode, status := resolveJobExitCode(jobSessions)
312+ _, _, duration := computeJobTiming(jobSessions)
313+ icon := map[string]string{"success": "ā
", "failed": "ā"}[status]
314+ if exitCode == 0 {
315+ fmt.Fprintf(os.Stdout, "%s job finished: %s (%s)\n", icon, status, duration)
316+ } else {
317+ fmt.Fprintf(os.Stdout, "%s job failed: exit %d (%s)\n", icon, exitCode, duration)
318+ }
319+
320+ return nil
321+}
322+
323+// cleanSessionShort produces a readable short name from a full session name.
324+// ci.name.jobID.step.lint ā lint
325+// ci.name.jobID.step.ci.name.jobID.runner ā runner
326+func cleanSessionShort(name, prefix, repoName, jobID string) string {
327+ short := strings.TrimPrefix(name, prefix)
328+ // Strip "step." prefix added by child sessions
329+ short = strings.TrimPrefix(short, "step.")
330+ // Strip nested full prefix (e.g. runner named ci.name.jobID.runner)
331+ nested := "ci." + repoName + "." + jobID + "."
332+ short = strings.TrimPrefix(short, nested)
333+ return short
334+}
335+
336 // runMonitor is a long-lived daemon that polls all ci.* zmx sessions,
337 // writes status as JSONL to stdout, stages artifacts, and syncs to destination.
338 // Logs go to stderr. Compose with shell tools to route status: