main
main.go
Eric Bower
·
2026-05-11
1package main
2
3import (
4 "archive/tar"
5 "bufio"
6 "context"
7 "crypto/sha256"
8 "embed"
9 "encoding/json"
10 "flag"
11 "fmt"
12 "html/template"
13 "io"
14 "io/fs"
15 "log/slog"
16 "os"
17 "os/exec"
18 "os/signal"
19 "path/filepath"
20 "runtime"
21 "strconv"
22 "strings"
23 "syscall"
24 "time"
25)
26
27//go:embed tmpl/*
28var tmplFS embed.FS
29
30type WorkspaceFactory func(cfg *Cfg, logger *slog.Logger, source string) Workspace
31
32func defaultWorkspaceFactory(cfg *Cfg, logger *slog.Logger, source string) Workspace {
33 if strings.HasSuffix(source, ".tar") {
34 return &WorkspaceTar{
35 Cfg: cfg,
36 Logger: logger,
37 Source: source,
38 }
39 }
40 return &WorkspaceRsync{
41 Cfg: cfg,
42 Logger: logger,
43 Source: source,
44 }
45}
46
47type Cfg struct {
48 Logger *slog.Logger
49 Ctx context.Context
50 Cancel context.CancelFunc
51 KeyLocation string
52 CertificateLocation string
53 ArtifactDir string
54 Event string // event JSON passed via --event flag
55 EventSource io.ReadCloser // when set, used directly as the event source (for testing)
56 MonitorInterval time.Duration
57 GCInterval time.Duration
58 NewWorkspace WorkspaceFactory
59 StatusOutput io.Writer // where status JSONL is written (default: os.Stdout)
60 IncludeRunning bool // emit running status updates in addition to terminal
61 HumanOutput bool // human-readable output instead of JSONL / slog
62 Wait bool // block until job completes, print history and summary
63}
64
65type Event struct {
66 Type string `json:"type"`
67 Name string `json:"name"`
68 Workspace string `json:"workspace"`
69 Branch string `json:"branch"`
70 Commit string `json:"commit"`
71 ArtifactDest string `json:"artifact_dest"`
72 ForcePush bool `json:"force_push,omitempty"`
73}
74
75func NewCfg() (*Cfg, string, bool) {
76 var keyLoc, certLoc, artifactDir, event string
77 var monitorInterval time.Duration
78 var gcInterval time.Duration
79 var logLevel string
80 flag.StringVar(&keyLoc, "pk", "", "ssh private key used to authenticate with pico services")
81 flag.StringVar(&certLoc, "ck", "", "ssh certificate public key used to authenticate with pico services (only required if using ssh certificates)")
82 flag.StringVar(&artifactDir, "artifact-dir", "/tmp/pici-artifacts", "local directory to stage artifacts")
83 flag.StringVar(&event, "event", "", "event JSON to run (alternative to reading from stdin)")
84 flag.DurationVar(&monitorInterval, "monitor-interval", 5*time.Second, "interval for monitoring zmx sessions")
85 flag.DurationVar(&gcInterval, "gc-interval", 10*time.Minute, "interval for garbage collection in monitor (0 to disable)")
86 flag.StringVar(&logLevel, "log-level", "info", "log level: debug, info, warn, error")
87 var includeRunning bool
88 var human bool
89 var wait bool
90 flag.BoolVar(&includeRunning, "include-running", false, "emit running status updates in addition to terminal (default: terminal only)")
91 flag.BoolVar(&human, "human", false, "human-readable output (default: JSONL / slog)")
92 flag.BoolVar(&wait, "wait", false, "block until job completes, printing session history and summary")
93
94 // Split args so the subcommand (first non-flag arg) doesn't block
95 // flags that appear after it: "pici runner --wait" works.
96 flags, cmd, wantHelp := splitCommand(os.Args[1:])
97 if err := flag.CommandLine.Parse(flags); err != nil {
98 fmt.Fprintf(os.Stderr, "failed to parse flags: %v\n", err)
99 os.Exit(1)
100 }
101
102 logger := newLogger("ci", logLevel)
103 ctx, cancel := context.WithCancel(context.Background())
104 return &Cfg{
105 NewWorkspace: defaultWorkspaceFactory,
106 Logger: logger.With("key_loc", keyLoc, "cert_loc", certLoc),
107 Ctx: ctx,
108 Cancel: cancel,
109 KeyLocation: keyLoc,
110 CertificateLocation: certLoc,
111 ArtifactDir: artifactDir,
112 Event: event,
113 MonitorInterval: monitorInterval,
114 GCInterval: gcInterval,
115 IncludeRunning: includeRunning,
116 HumanOutput: human,
117 Wait: wait,
118 }, cmd, wantHelp
119}
120
121// splitCommand separates the first non-flag argument (the subcommand) from
122// the rest of the flags, so "runner --wait" becomes flags=["--wait"], cmd="runner".
123// It strips --help/help so we can print custom help per subcommand.
124func splitCommand(args []string) (flags []string, cmd string, wantHelp bool) {
125 flags = make([]string, 0, len(args))
126 for _, arg := range args {
127 if arg == "--help" || arg == "help" {
128 wantHelp = true
129 continue
130 }
131 if cmd == "" && arg != "" && !strings.HasPrefix(arg, "-") {
132 cmd = arg
133 } else {
134 flags = append(flags, arg)
135 }
136 }
137 return flags, cmd, wantHelp
138}
139
140func parseLogLevel(s string) slog.Level {
141 switch strings.ToLower(s) {
142 case "debug":
143 return slog.LevelDebug
144 case "warn":
145 return slog.LevelWarn
146 case "error":
147 return slog.LevelError
148 default:
149 return slog.LevelInfo
150 }
151}
152
153func newLogger(space string, levelStr string) *slog.Logger {
154 lvl := parseLogLevel(levelStr)
155 return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
156 Level: lvl,
157 })).With("service", space)
158}
159
160func main() {
161 cfg, cmd, wantHelp := NewCfg()
162
163 cfg.Logger.Debug("setting up ci", "cfg", cfg)
164 cfg.Logger.Debug("running cmd", "cmd", cmd)
165
166 if wantHelp && (cmd == "runner" || cmd == "monitor") {
167 if cmd == "runner" {
168 printRunnerHelp()
169 } else {
170 printMonitorHelp()
171 }
172 return
173 }
174
175 switch cmd {
176 case "runner":
177 cfg.Logger.Debug("starting runner")
178 if err := RunRunner(cfg); err != nil {
179 cfg.Logger.Error("runner failed", "err", err)
180 os.Exit(1)
181 }
182 case "cancel":
183 cfg.Logger.Debug("starting cancel handler")
184 if err := runCancel(cfg); err != nil {
185 cfg.Logger.Error("cancel failed", "err", err)
186 os.Exit(1)
187 }
188 case "gc":
189 cfg.Logger.Debug("starting garbage collection")
190 if err := runGC(cfg); err != nil {
191 cfg.Logger.Error("gc failed", "err", err)
192 os.Exit(1)
193 }
194 case "monitor":
195 cfg.Logger.Debug("starting monitor")
196 if err := runMonitor(cfg); err != nil {
197 cfg.Logger.Error("monitor failed", "err", err)
198 os.Exit(1)
199 }
200 case "status":
201 cfg.Logger.Debug("starting status updater")
202 case "orca":
203 cfg.Logger.Debug("starting orchestrator")
204 default:
205 cfg.Logger.Error("must provide command: runner, cancel, gc, monitor, status, or orca")
206 os.Exit(1)
207 }
208}
209
210func printMonitorHelp() {
211 fmt.Println(`pici monitor — poll ci.* zmx sessions, stage artifacts, publish status.
212
213USAGE
214 pici monitor [flags] # JSONL to stdout (pipe to webhooks)
215 pici monitor --human # human-readable terminal output
216
217OUTPUT MODES
218 Default (JSONL): One JSON object per line, suitable for piping to notifications.
219 pici monitor > status.jsonl
220 pici monitor | ssh pipe.pico.sh "pub build.status -b=false"
221 pici monitor | while read -r line; do curl -sd"$line" $WEBHOOK; done
222
223 --human: Selfci-style progress output for terminal viewing (do not pipe).
224 pici monitor --human
225 pici monitor --human --include-running
226 [2/3] 🚀 running: myrepo (1m23s)
227 [3/3] ✅ success: myrepo (2m34s)
228
229FLAGS
230 -pk <path> SSH private key for authenticating with pico services
231 -ck <path> SSH certificate public key
232 -artifact-dir <path> Local directory to stage artifacts (default: /tmp/pici-artifacts)
233 -monitor-interval <dur> Poll interval (default: 5s)
234 -gc-interval <dur> Garbage collection interval (default: 10m, 0 to disable)
235 -include-running Emit running status updates in addition to terminal (default: terminal only)
236 -human Human-readable output instead of JSONL (for terminal, not piping)
237 -log-level <level> Log level: debug, info, warn, error (default: info)`)
238}
239
240func printRunnerHelp() {
241 fmt.Println(`pici runner — execute a CI job from an event JSON payload.
242
243USAGE
244 echo '<event-json>' | pici runner [flags]
245 pici runner --event '<event-json>' [flags]
246 echo '<event-json>' | pici runner --wait [flags] # block until done
247
248EVENT JSON FIELDS
249 type (required) Event type, e.g. "push", "merge_request"
250 name (required) Repository name, used for session naming
251 workspace (required) SSH source path to rsync, e.g. "git@github.com:user/repo.git"
252 artifact_dest (optional) SSH destination for artifact sync, e.g. "user@host:/path"
253
254
255EXAMPLE
256 echo '{"type":"push","name":"myrepo","workspace":"git@github.com:user/myrepo.git"}' | pici runner
257
258ENV VARS IN pico.sh
259 The runner exports these environment variables for your pico.sh script:
260 PICO_CI_JOB_ID Unique job identifier (e.g. "a3f2b8c1")
261 PICO_CI_REPO Repository name (from event "name" field)
262 PICO_CI_EVENT_TYPE Event type (from event "type" field)
263
264 Note: pico.sh runs with the workspace as its working directory, so
265 $(pwd) gives you the workspace path directly.
266
267 Use them with defaults so pico.sh works standalone:
268 JOB_ID="${PICO_CI_JOB_ID:-local}"
269 REPO="${PICO_CI_REPO:-unknown}"
270
271FLAGS
272 -pk <path> SSH private key for authenticating with pico services
273 -ck <path> SSH certificate public key (required when using SSH certificates)
274 -event <json> Event JSON string (alternative to reading from stdin)
275 -artifact-dir <path> Local directory to stage artifacts (default: /tmp/pici-artifacts)
276 -log-level <level> Log level: debug, info, warn, error (default: info)
277 -human Human-readable output (default: enabled for runner)
278 -wait Block until job completes, printing session history and summary`)
279
280}
281
282func RunRunner(cfg *Cfg) error {
283 var payload string
284 if cfg.EventSource != nil {
285 data, err := io.ReadAll(cfg.EventSource)
286 if err != nil {
287 return fmt.Errorf("read event source: %w", err)
288 }
289 payload = strings.TrimSpace(string(data))
290 } else if cfg.Event != "" {
291 payload = cfg.Event
292 } else {
293 data, err := io.ReadAll(os.Stdin)
294 if err != nil {
295 return fmt.Errorf("read stdin: %w", err)
296 }
297 payload = strings.TrimSpace(string(data))
298 }
299
300 var eventData Event
301 if err := json.Unmarshal([]byte(payload), &eventData); err != nil {
302 return fmt.Errorf("unmarshal event: %w", err)
303 }
304
305 // Validate required fields
306 if eventData.Type == "" {
307 return fmt.Errorf("event missing required field: type")
308 }
309 if eventData.Name == "" {
310 return fmt.Errorf("event missing required field: name")
311 }
312 if eventData.Workspace == "" {
313 return fmt.Errorf("event missing required field: workspace")
314 }
315
316 return eventHandler(cfg, &eventData)
317}
318
319type Workspace interface {
320 Setup() error
321 Cleanup() error
322 GetDir() string
323 // Checksum returns a content hash of the workspace (e.g. sha256 of the tarball).
324 // Returns empty string if not available.
325 Checksum() string
326}
327
328type WorkspaceRsync struct {
329 Cfg *Cfg
330 Logger *slog.Logger
331 Source string
332 Dest string
333}
334
335func (w *WorkspaceRsync) Setup() error {
336 tempDir, err := os.MkdirTemp("", "pici-*")
337 if err != nil {
338 return err
339 }
340 w.Dest = tempDir
341
342 log := w.Logger.With("source", w.Source, "dest", w.Dest)
343 log.Debug("syncing workspace via rsync")
344
345 var cmd *exec.Cmd
346 if w.Cfg.KeyLocation != "" {
347 sshcmd := fmt.Sprintf(
348 "-F ~/.ssh/config -i %s -o IdentitiesOnly=yes -o CertificateFile %s",
349 w.Cfg.KeyLocation,
350 w.Cfg.CertificateLocation,
351 )
352 cmd = exec.Command("rsync", "-e", sshcmd, "-rv", `--exclude="/.git"`, w.Source+"/", w.Dest+"/")
353 } else {
354 cmd = exec.Command("rsync", "-rv", `--exclude="/.git"`, w.Source+"/", w.Dest+"/")
355 }
356 return runCmd(cmd, log)
357}
358
359func (w *WorkspaceRsync) Cleanup() error {
360 // return os.RemoveAll(w.Dest)
361 return nil
362}
363
364func (w *WorkspaceRsync) GetDir() string {
365 return w.Dest
366}
367
368func (w *WorkspaceRsync) Checksum() string {
369 return ""
370}
371
372type WorkspaceTar struct {
373 Cfg *Cfg
374 Logger *slog.Logger
375 Source string // e.g. "pgs.sh:/private-ci/workspaces/repo_abc123.tar"
376 Dest string
377 checksum string
378}
379
380func (w *WorkspaceTar) Setup() error {
381 tempDir, err := os.MkdirTemp("", "pici-*")
382 if err != nil {
383 return err
384 }
385 w.Dest = tempDir
386
387 log := w.Logger.With("source", w.Source, "dest", w.Dest)
388 log.Debug("downloading and extracting workspace tar")
389
390 // Parse host:path from source
391 host, path := splitSSHSource(w.Source)
392
393 // Use rsync to download the tar file
394 tarPath := filepath.Join(tempDir, "workspace.tar")
395 rsyncCmd := exec.Command("rsync", "-e", "ssh", host+":"+path, tarPath)
396 rsyncCmd.Stderr = os.Stderr
397 if err := rsyncCmd.Run(); err != nil {
398 return fmt.Errorf("rsync download: %w", err)
399 }
400
401 // Compute sha256 checksum of the tarball
402 tarData, err := os.ReadFile(tarPath)
403 if err != nil {
404 return fmt.Errorf("read tar for checksum: %w", err)
405 }
406 w.checksum = fmt.Sprintf("sha256:%x", sha256.Sum256(tarData))
407 log.Debug("workspace checksum", "checksum", w.checksum)
408
409 // Open the tar file for extraction
410 f, err := os.Open(tarPath)
411 if err != nil {
412 return fmt.Errorf("open tar: %w", err)
413 }
414 defer func() {
415 if err := f.Close(); err != nil {
416 log.Error("close tar", "err", err)
417 }
418 }()
419
420 // Extract tar to temp dir
421 tr := tar.NewReader(f)
422 for {
423 hdr, err := tr.Next()
424 if err == io.EOF {
425 break
426 }
427 if err != nil {
428 return fmt.Errorf("tar read: %w", err)
429 }
430
431 target := filepath.Join(tempDir, hdr.Name)
432 switch hdr.Typeflag {
433 case tar.TypeDir:
434 if err := os.MkdirAll(target, 0755); err != nil {
435 return fmt.Errorf("mkdir %s: %w", target, err)
436 }
437 case tar.TypeReg:
438 // Ensure parent dir exists
439 if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
440 return fmt.Errorf("mkdir parent %s: %w", filepath.Dir(target), err)
441 }
442 tf, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY, os.FileMode(hdr.Mode))
443 if err != nil {
444 return fmt.Errorf("open %s: %w", target, err)
445 }
446 if _, err := io.Copy(tf, tr); err != nil {
447 if closeErr := tf.Close(); closeErr != nil {
448 return fmt.Errorf("close %s: %w", target, closeErr)
449 }
450 return fmt.Errorf("write %s: %w", target, err)
451 }
452 if err := tf.Close(); err != nil {
453 return fmt.Errorf("close %s: %w", target, err)
454 }
455 }
456 }
457
458 log.Debug("workspace extracted")
459 return nil
460}
461
462func (w *WorkspaceTar) Checksum() string {
463 return w.checksum
464}
465
466func (w *WorkspaceTar) Cleanup() error {
467 return nil
468}
469
470func (w *WorkspaceTar) GetDir() string {
471 return w.Dest
472}
473
474// splitSSHSource splits "host:path" into host and path.
475func splitSSHSource(source string) (string, string) {
476 idx := strings.Index(source, ":")
477 if idx == -1 {
478 return source, ""
479 }
480 return source[:idx], source[idx+1:]
481}
482
483type JobEngine struct {
484 Wk Workspace
485 Logger *slog.Logger
486 Cfg *Cfg
487 Ev *Event
488 JobID string
489}
490
491type SessionInfo struct {
492 Name string `json:"name"`
493 Short string `json:"short"`
494 PID string `json:"pid"`
495 Clients string `json:"clients"`
496 Created string `json:"created"`
497 StartDir string `json:"start_dir"`
498 Ended string `json:"ended"`
499 ExitCode string `json:"exit_code"`
500}
501
502type StatusPayload struct {
503 Timestamp string `json:"timestamp"`
504 Name string `json:"name"`
505 JobID string `json:"job_id"`
506 Status string `json:"status"`
507 ExitCode *int `json:"exit_code"`
508 Duration string `json:"duration,omitempty"`
509 StartedAt string `json:"started_at,omitempty"`
510 EndedAt string `json:"ended_at,omitempty"`
511 SessionCount int `json:"session_count"`
512 Sessions []SessionInfo `json:"sessions"`
513}
514
515func (eng *JobEngine) Setup() error {
516 return eng.Wk.Setup()
517}
518
519func (eng *JobEngine) Run(manifest string) error {
520 prefix := fmt.Sprintf("ci.%s.%s.", eng.Ev.Name, eng.JobID)
521 // Child sessions use ".step." sub-prefix so zmx wait "*" inside pico.sh
522 // matches ci.<name>.<jobID>.step.* but NOT ci.<name>.<jobID>.runner.
523 // This avoids a deadlock where the runner waits for itself.
524 childPrefix := prefix + "step."
525
526 log := eng.Logger.With("manifest", manifest, "prefix", prefix)
527 log.Debug("starting runner session", "session", prefix+"runner")
528
529 runnerName := prefix + "runner"
530 // Name the runner explicitly. Do NOT set ZMX_SESSION_PREFIX for this
531 // outer zmx run call — it would be prepended to the runner's own name.
532 // The prefix is only for child sessions spawned inside pico.sh (via the
533 // exported env var).
534 cmdEnv := make([]string, 0, len(os.Environ())+5)
535 for _, e := range os.Environ() {
536 if !strings.HasPrefix(e, "ZMX_SESSION_PREFIX=") {
537 cmdEnv = append(cmdEnv, e)
538 }
539 }
540 cmdEnv = append(cmdEnv,
541 fmt.Sprintf("PICO_CI_JOB_ID=%s", eng.JobID),
542 fmt.Sprintf("PICO_CI_REPO=%s", eng.Ev.Name),
543 fmt.Sprintf("PICO_CI_EVENT_TYPE=%s", eng.Ev.Type),
544 )
545
546 zmxPrefixStr := fmt.Sprintf("ZMX_SESSION_PREFIX=%s", childPrefix)
547 cmd := exec.Command("zmx", "run", runnerName, "-d", zmxPrefixStr, "bash", manifest)
548 cmd.Env = cmdEnv
549 cmd.Dir = eng.Wk.GetDir()
550
551 if err := cmd.Run(); err != nil {
552 return fmt.Errorf("start runner session: %w", err)
553 }
554
555 return nil
556}
557
558func (eng *JobEngine) Cleanup() error {
559 return eng.Wk.Cleanup()
560}
561
562func (eng *JobEngine) FindManifest() (string, error) {
563 fnames := []string{"pico.sh"}
564 for _, manifest := range fnames {
565 path := filepath.Join(eng.Wk.GetDir(), manifest)
566 if _, err := os.Stat(path); err == nil {
567 return path, nil
568 }
569 }
570 return "", fmt.Errorf("no pico.sh found in %s", eng.Wk.GetDir())
571}
572
573func eventHandler(cfg *Cfg, eventData *Event) error {
574 log := cfg.Logger.With("repo", eventData.Name, "type", eventData.Type)
575
576 // Cancel any existing job for this repo before starting a new one
577 cancelRunningJobs(cfg, log, eventData.Name)
578
579 jobID := generateJobID(eventData.Name, eventData.Workspace)
580 log = log.With("job_id", jobID)
581 eventBytes, _ := json.Marshal(eventData)
582 fmt.Fprintf(os.Stdout, "🚀 starting job ci.%s.%s\n", eventData.Name, jobID) //nolint:errcheck
583 fmt.Fprintf(os.Stdout, " event: type=%s name=%s workspace=%s\n", eventData.Type, eventData.Name, eventData.Workspace) //nolint:errcheck
584 fmt.Fprintf(os.Stdout, " %s\n", string(eventBytes)) //nolint:errcheck
585
586 wk := cfg.NewWorkspace(cfg, log, eventData.Workspace)
587 eng := &JobEngine{
588 Logger: log,
589 Cfg: cfg,
590 Wk: wk,
591 Ev: eventData,
592 JobID: jobID,
593 }
594 defer func() {
595 if err := eng.Cleanup(); err != nil {
596 cfg.Logger.Error("engine cleanup", "err", err)
597 }
598 }()
599
600 fmt.Fprintf(os.Stdout, "📦 syncing workspace %s\n", eventData.Workspace) //nolint:errcheck
601 if err := eng.Setup(); err != nil {
602 return fmt.Errorf("setup: %w", err)
603 }
604 fmt.Fprintf(os.Stdout, "✅ workspace ready %s\n", eng.Wk.GetDir()) //nolint:errcheck
605
606 // Store the event in the artifact directory so the monitor can access it
607 eventDir := filepath.Join(cfg.ArtifactDir, eventData.Name, jobID)
608 artifactsDir := filepath.Join(eventDir, "artifacts")
609 if err := os.MkdirAll(artifactsDir, 0755); err != nil {
610 log.Error("create artifacts dir", "err", err)
611 } else {
612 if err := os.WriteFile(filepath.Join(artifactsDir, "event.json"), eventBytes, 0644); err != nil {
613 log.Error("write event", "err", err)
614 }
615 }
616
617 // Write attestation.json with runner/workspace provenance
618 hostname, _ := os.Hostname()
619 attestation := map[string]interface{}{
620 "runner": map[string]string{
621 "hostname": hostname,
622 "os": runtimeOS(),
623 "arch": runtimeArch(),
624 },
625 "provenance": map[string]string{
626 "repo": eventData.Name,
627 "branch": eventData.Branch,
628 "commit": eventData.Commit,
629 },
630 "workspace_checksum": eng.Wk.Checksum(),
631 }
632 attestationBytes, _ := json.Marshal(attestation)
633 if err := os.WriteFile(filepath.Join(artifactsDir, "attestation.json"), attestationBytes, 0644); err != nil {
634 log.Error("write attestation", "err", err)
635 }
636
637 manifest, err := eng.FindManifest()
638 if err != nil {
639 fmt.Fprintf(os.Stdout, "❌ %s\n\n", err) //nolint:errcheck
640 //nolint:errcheck
641 fmt.Fprint(os.Stdout, `Create a pico.sh script in your workspace root:
642
643 #!/usr/bin/env bash
644 set -euo pipefail
645
646 export ZMX_SESSION_PREFIX="${ZMX_SESSION_PREFIX:-ci.}"
647
648 # Run your CI steps as zmx sessions
649 zmx run lint -d <your lint command>
650 zmx run test -d <your test command>
651
652 # Wait for all steps to complete
653 zmx wait "*"
654
655 printf "\x1b[32msuccess!\x1b[0m\n"
656
657Each 'zmx run' spawns a parallel session. 'zmx wait "*"' blocks
658until all child sessions finish.
659
660You can also run this script in isolation without the runner.
661
662See: https://github.com/picosh/pici
663
664`)
665 return err
666 }
667 fmt.Fprintf(os.Stdout, "🔍 found %s\n", manifest) //nolint:errcheck
668
669 fmt.Fprint(os.Stdout, "🏃 launching sessions...\n") //nolint:errcheck
670 if err := eng.Run(manifest); err != nil {
671 return fmt.Errorf("run: %w", err)
672 }
673
674 fmt.Fprintln(os.Stdout, "✅ job launched") //nolint:errcheck
675
676 if cfg.Wait {
677 if err := waitAndReport(cfg, log, eventData.Name, jobID); err != nil {
678 return fmt.Errorf("wait: %w", err)
679 }
680 return nil
681 }
682
683 session := fmt.Sprintf("ci.%s.%s.runner", eventData.Name, jobID)
684 fmt.Fprintf(os.Stdout, " zmx tail %s\n", session) //nolint:errcheck
685 fmt.Fprintf(os.Stdout, " zmx history %s\n", session) //nolint:errcheck
686 fmt.Fprintf(os.Stdout, " zmx attach %s\n", session) //nolint:errcheck
687 return nil
688}
689
690// waitAndReport polls the job's sessions until all complete, prints live
691// progress to stdout, then dumps session history and a final summary.
692func waitAndReport(cfg *Cfg, log *slog.Logger, name, jobID string) error {
693 prefix := "ci." + name + "." + jobID + "."
694 ticker := time.NewTicker(cfg.MonitorInterval)
695 defer ticker.Stop()
696
697 // Handle ^C gracefully
698 sigCh := make(chan os.Signal, 1)
699 signal.Notify(sigCh, syscall.SIGINT)
700 defer signal.Stop(sigCh)
701
702 fmt.Fprintln(os.Stdout) //nolint:errcheck
703 fmt.Fprint(os.Stdout, "⏳ waiting for completion...\n") //nolint:errcheck
704
705 // Track state for each session
706 known := make(map[string]*sessionState)
707 var jobSessions []SessionInfo
708 var sessionOrder []string // track insertion order for deterministic output
709 var liveLines []string // last set of status lines printed (for overwrite)
710
711 for {
712 select {
713 case <-cfg.Ctx.Done():
714 return cfg.Ctx.Err()
715 case <-sigCh:
716 fmt.Fprintln(os.Stdout, "\n⏹ cancelled") //nolint:errcheck
717 return nil
718 case <-ticker.C:
719 }
720
721 // Fetch current session list
722 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
723 if err != nil {
724 log.Error("zmx list", "err", err)
725 continue
726 }
727
728 sessions := parseZMXList(string(listOutput))
729 jobSessions = nil
730 for _, s := range sessions {
731 if strings.HasPrefix(s.Name, prefix) {
732 s.Short = cleanSessionShort(s.Name, prefix, name, jobID)
733 jobSessions = append(jobSessions, s)
734 }
735 }
736
737 if len(jobSessions) == 0 {
738 continue // runner session may not have appeared yet
739 }
740
741 // Update state for each session
742 for _, s := range jobSessions {
743 state, ok := known[s.Short]
744 if !ok {
745 state = &sessionState{}
746 known[s.Short] = state
747 sessionOrder = append(sessionOrder, s.Short)
748 }
749
750 if s.Ended == "" {
751 state.status = "running"
752 created, _ := strconv.ParseInt(s.Created, 10, 64)
753 state.duration = fmtDurationTs(created, time.Now().Unix())
754 } else {
755 if s.ExitCode == "0" {
756 state.status = "success"
757 } else {
758 state.status = "failed"
759 state.exitCode = s.ExitCode
760 }
761 state.duration = fmtDuration(s.Created, s.Ended)
762 state.printed = true // lock final state
763 }
764 }
765
766 // Build current status lines
767 lines := make([]string, 0, len(sessionOrder))
768 for _, short := range sessionOrder {
769 state := known[short]
770 icon := map[string]string{"running": "🚀", "success": "✅", "failed": "❌"}[state.status]
771 detail := ""
772 if state.status == "failed" && state.exitCode != "" {
773 detail = fmt.Sprintf(", exit %s", state.exitCode)
774 }
775 if state.duration != "" && state.duration != "—" {
776 detail += fmt.Sprintf(" (%s)", state.duration)
777 }
778 lines = append(lines, fmt.Sprintf(" %-12s %s %s%s", short, icon, state.status, detail))
779 }
780
781 // Overwrite previous lines with cursor-up, or print fresh
782 if len(liveLines) > 0 {
783 // Move cursor up to overwrite previous lines //nolint:errcheck
784 for range len(liveLines) {
785 fmt.Fprint(os.Stdout, "\033[A") //nolint:errcheck
786 }
787 // Clear each line
788 for i, line := range lines { //nolint:errcheck
789 if i > 0 {
790 fmt.Fprint(os.Stdout, "\n") //nolint:errcheck
791 }
792 fmt.Fprint(os.Stdout, line+"\033[K") //nolint:errcheck
793 }
794 fmt.Fprint(os.Stdout, "\n") //nolint:errcheck
795 } else { //nolint:errcheck
796 for _, line := range lines {
797 fmt.Fprintln(os.Stdout, line) //nolint:errcheck
798 }
799 }
800 liveLines = lines
801
802 // Check if all sessions are done
803 if allCompleted(jobSessions) {
804 break
805 }
806 }
807
808 // Print last 25 lines of history for failed sessions only
809 fmt.Fprintln(os.Stdout) //nolint:errcheck
810 for _, s := range jobSessions {
811 state := known[s.Short]
812 if state == nil || state.status != "failed" {
813 continue
814 }
815
816 separator := strings.Repeat("\u2500", 50)
817 fmt.Fprintln(os.Stdout, separator) //nolint:errcheck
818 fmt.Fprintf(os.Stdout, "Session: %s (exit %s)\n", s.Short, state.exitCode) //nolint:errcheck
819 fmt.Fprintln(os.Stdout, separator) //nolint:errcheck
820 fmt.Fprintln(os.Stdout) //nolint:errcheck
821
822 history, err := fetchHistoryPlain(s.Name)
823 if err != nil {
824 fmt.Fprintf(os.Stdout, " (history unavailable: %v)\n", err) //nolint:errcheck
825 } else {
826 lines := strings.Split(history, "\n")
827 // Show last 25 lines
828 if len(lines) > 25 {
829 fmt.Fprintf(os.Stdout, " ... (%d lines omitted)\n", len(lines)-25) //nolint:errcheck
830 lines = lines[len(lines)-25:]
831 }
832 for _, line := range lines {
833 if line != "" {
834 fmt.Fprintf(os.Stdout, " %s\n", line) //nolint:errcheck
835 }
836 }
837 }
838 fmt.Fprintln(os.Stdout) //nolint:errcheck
839 }
840
841 // Final summary
842 exitCode, status := resolveJobExitCode(jobSessions)
843 _, _, duration := computeJobTiming(jobSessions)
844 icon := map[string]string{"success": "✅", "failed": "❌"}[status]
845 if exitCode == 0 {
846 fmt.Fprintf(os.Stdout, "%s job finished: %s (%s)\n", icon, status, duration) //nolint:errcheck
847 } else {
848 fmt.Fprintf(os.Stdout, "%s job failed: exit %d (%s)\n", icon, exitCode, duration) //nolint:errcheck
849 }
850
851 return nil
852}
853
854// cleanSessionShort produces a readable short name from a full session name.
855// ci.name.jobID.step.lint → lint
856// ci.name.jobID.step.ci.name.jobID.runner → runner
857func cleanSessionShort(name, prefix, repoName, jobID string) string {
858 short := strings.TrimPrefix(name, prefix)
859 // Strip "step." prefix added by child sessions
860 short = strings.TrimPrefix(short, "step.")
861 // Strip nested full prefix (e.g. runner named ci.name.jobID.runner)
862 nested := "ci." + repoName + "." + jobID + "."
863 short = strings.TrimPrefix(short, nested)
864 return short
865}
866
867// runMonitor is a long-lived daemon that polls all ci.* zmx sessions,
868// writes status as JSONL to stdout, stages artifacts, and syncs to destination.
869// Logs go to stderr. Compose with shell tools to route status:
870//
871// pici monitor | ssh pipe.pico.sh "pub build.status -b=false"
872// pici monitor | while read -r line; do curl -sd"$line" $WEBHOOK; done
873// pici monitor > status.jsonl
874//
875// sessionState tracks the display state of a single session.
876type sessionState struct {
877 status string // "running", "success", "failed"
878 exitCode string
879 duration string
880 printed bool // true once final state (success/failed) is printed
881}
882
883// monitorJobState tracks display state for a single job across ticks.
884type monitorJobState struct {
885 sessionOrder []string // insertion order for deterministic output
886 sessions map[string]*sessionState
887 liveLines []string // last set of status lines printed (for overwrite)
888}
889
890func runMonitor(cfg *Cfg) error {
891 log := cfg.Logger.With("cmd", "monitor")
892
893 output := cfg.StatusOutput
894 if output == nil {
895 output = os.Stdout
896 }
897
898 ticker := time.NewTicker(cfg.MonitorInterval)
899 defer ticker.Stop()
900
901 // Optional GC ticker — runs garbage collection on a separate interval.
902 var gcTicker *time.Ticker
903 if cfg.GCInterval > 0 {
904 gcTicker = time.NewTicker(cfg.GCInterval)
905 defer gcTicker.Stop()
906 }
907
908 // Track per-job display state across ticks (for human output)
909 jobStates := make(map[string]*monitorJobState) // key: "name/jobID"
910
911 log.Debug("monitor started", "interval", cfg.MonitorInterval, "gc_interval", cfg.GCInterval, "artifact_dir", cfg.ArtifactDir)
912 log.Debug("monitoring ci.* sessions for job status, writing status to stdout")
913
914 for {
915 select {
916 case <-cfg.Ctx.Done():
917 log.Debug("context cancelled, stopping monitor")
918 return cfg.Ctx.Err()
919 case <-ticker.C:
920 if err := monitorTick(cfg, log, output, jobStates); err != nil {
921 log.Error("monitor tick", "err", err)
922 }
923 case <-gcTicker.C:
924 log.Debug("running periodic garbage collection")
925 if err := runGC(cfg); err != nil {
926 log.Error("periodic gc", "err", err)
927 }
928 }
929 }
930}
931
932// monitorTick performs a single monitoring pass over all ci.* sessions.
933// renderJobRunning prints per-session status for a running job, using ANSI
934// cursor control to overwrite previous lines.
935func renderJobRunning(output io.Writer, name, jobID string, group []SessionInfo, duration string, jobStates map[string]*monitorJobState) {
936 key := name + "/" + jobID
937 state, ok := jobStates[key]
938 if !ok {
939 state = &monitorJobState{sessions: make(map[string]*sessionState)}
940 jobStates[key] = state
941 }
942
943 // Track new sessions
944 for _, s := range group {
945 if _, ok := state.sessions[s.Short]; !ok {
946 state.sessions[s.Short] = &sessionState{}
947 state.sessionOrder = append(state.sessionOrder, s.Short)
948 }
949 }
950
951 // Update session states
952 for _, s := range group {
953 ss := state.sessions[s.Short]
954 if s.Ended == "" {
955 ss.status = "running"
956 created, _ := strconv.ParseInt(s.Created, 10, 64)
957 ss.duration = fmtDurationTs(created, time.Now().Unix())
958 } else {
959 if s.ExitCode == "0" {
960 ss.status = "success"
961 } else {
962 ss.status = "failed"
963 ss.exitCode = s.ExitCode
964 }
965 ss.duration = fmtDuration(s.Created, s.Ended)
966 }
967 }
968
969 // Build status lines
970 lines := make([]string, 0, len(state.sessionOrder))
971 for _, short := range state.sessionOrder {
972 ss := state.sessions[short]
973 icon := map[string]string{"running": "🚀", "success": "✅", "failed": "❌"}[ss.status]
974 detail := ""
975 if ss.status == "failed" && ss.exitCode != "" {
976 detail = fmt.Sprintf(", exit %s", ss.exitCode)
977 }
978 if ss.duration != "" && ss.duration != "—" {
979 detail += fmt.Sprintf(" (%s)", ss.duration)
980 }
981 lines = append(lines, fmt.Sprintf(" %-12s %s %s%s", short, icon, ss.status, detail))
982 }
983
984 // Overwrite previous lines or print fresh
985 if len(state.liveLines) > 0 {
986 for range len(state.liveLines) {
987 fmt.Fprint(output, "\033[A") //nolint:errcheck
988 }
989 for i, line := range lines {
990 if i > 0 {
991 fmt.Fprint(output, "\n") //nolint:errcheck
992 }
993 fmt.Fprint(output, line+"\033[K") //nolint:errcheck
994 }
995 fmt.Fprint(output, "\n") //nolint:errcheck
996 } else {
997 fmt.Fprintf(output, " %-12s %s %s\n", name, "🚀", "running") //nolint:errcheck
998 for _, line := range lines {
999 fmt.Fprintln(output, line) //nolint:errcheck
1000 }
1001 }
1002 state.liveLines = lines
1003}
1004
1005// renderJobFinal prints the final status for a completed job.
1006func renderJobFinal(output io.Writer, name, jobID string, group []SessionInfo, duration, status string, success bool, workspace, artifactDir string) {
1007 icon := map[string]string{"success": "✅", "failed": "❌"}[status]
1008 fmt.Fprintf(output, " %-12s %s %s (%s)\n", name, icon, status, duration) //nolint:errcheck
1009
1010 // Per-session summary
1011 for _, s := range group {
1012 icon := "✅"
1013 if s.ExitCode != "0" {
1014 icon = "❌"
1015 }
1016 dur := fmtDuration(s.Created, s.Ended)
1017 fmt.Fprintf(output, " %-12s %s done (%s)\n", s.Short, icon, dur) //nolint:errcheck
1018 }
1019
1020 // Context info
1021 fmt.Fprint(output, "\n") //nolint:errcheck
1022 if workspace != "" {
1023 fmt.Fprintf(output, " workspace: %s\n", workspace) //nolint:errcheck
1024 }
1025 artifactPath := filepath.Join(artifactDir, name, jobID)
1026 fmt.Fprintf(output, " artifacts: %s\n", artifactPath) //nolint:errcheck
1027 fmt.Fprint(output, "\n") //nolint:errcheck
1028}
1029
1030func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer, jobStates map[string]*monitorJobState) error {
1031 // a. zmx list → filter ci.* sessions
1032 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
1033 if err != nil {
1034 return fmt.Errorf("zmx list: %w", err)
1035 }
1036 sessions := parseZMXList(string(listOutput))
1037 ciSessions := filterCISessions(sessions)
1038
1039 if len(ciSessions) == 0 {
1040 log.Debug("no ci.* sessions found")
1041 return nil
1042 }
1043
1044 log.Debug("found ci sessions", "count", len(ciSessions))
1045
1046 // b. Group by job prefix: ci.<name>.<jobID>.
1047 groups := groupSessionsByJob(ciSessions)
1048
1049 // c. Process each job group
1050 for prefix, group := range groups {
1051 name, jobID := parseJobPrefix(prefix)
1052 if name == "" {
1053 continue
1054 }
1055
1056 log := log.With("repo", name, "job_id", jobID)
1057
1058 // Fetch and stage history for every session at each tick,
1059 // not just when the job completes. This gives live progress
1060 // snapshots while the job is running.
1061 for _, s := range group {
1062 // Determine session status and timing
1063 sessionStatus := "running"
1064 sessionDuration := fmtDuration(s.Created, fmt.Sprintf("%d", time.Now().Unix()))
1065 sessionExitCode := ""
1066 if s.Ended != "" {
1067 sessionDuration = fmtDuration(s.Created, s.Ended)
1068 if s.ExitCode == "0" {
1069 sessionStatus = "success"
1070 sessionExitCode = "0"
1071 } else {
1072 sessionStatus = "failed"
1073 sessionExitCode = s.ExitCode
1074 }
1075 }
1076
1077 html, err := fetchHistoryHTML(s.Name, name, jobID, sessionStatus, sessionDuration, sessionExitCode)
1078 if err != nil {
1079 log.Error("fetch history html", "session", s.Name, "err", err)
1080 continue
1081 }
1082 if err := stageArtifact(cfg.ArtifactDir, name, jobID, s.Short, html, ".html"); err != nil {
1083 log.Error("stage html artifact", "session", s.Short, "err", err)
1084 }
1085
1086 plain, err := fetchHistoryPlain(s.Name)
1087 if err != nil {
1088 log.Error("fetch history plain", "session", s.Name, "err", err)
1089 continue
1090 }
1091 if err := stageArtifact(cfg.ArtifactDir, name, jobID, s.Short, plain, ".txt"); err != nil {
1092 log.Error("stage txt artifact", "session", s.Short, "err", err)
1093 }
1094 }
1095
1096 // Generate and stage job index landing pages
1097 indexHTML, indexTXT := generateJobIndex(cfg.ArtifactDir, name, jobID, group)
1098 if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexHTML, ".html"); err != nil {
1099 log.Error("stage index.html", "err", err)
1100 }
1101 if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexTXT, ".txt"); err != nil {
1102 log.Error("stage index.txt", "err", err)
1103 }
1104 // Stage shared CSS
1105 if styles, err := loadStyles(); err == nil {
1106 if err := stageArtifact(cfg.ArtifactDir, name, jobID, "styles", styles, ".css"); err != nil {
1107 log.Error("stage styles.css", "err", err)
1108 }
1109 }
1110
1111 // Load event to get artifact destination
1112 eventData, _ := loadEvent(cfg.ArtifactDir, name, jobID)
1113
1114 // Compute job-level timing from session timestamps
1115 startedAt, endedAt, duration := computeJobTiming(group)
1116
1117 if allCompleted(group) {
1118 // Check sentinel — publish terminal status exactly once
1119 sentinel := filepath.Join(cfg.ArtifactDir, name, jobID, "artifacts", "published.json")
1120 log.Debug("checking completion", "all_completed", true, "sentinel", sentinel)
1121 if _, err := os.Stat(sentinel); err == nil {
1122 log.Debug("terminal status already published, skipping", "sentinel", sentinel)
1123 continue
1124 }
1125
1126 log.Debug("job completed, publishing final status", "sessions", len(group))
1127 exitCode, status := resolveJobExitCode(group)
1128 log.Info("job finished", "status", status, "exit_code", exitCode)
1129
1130 if cfg.HumanOutput {
1131 renderJobFinal(output, name, jobID, group, duration, status, exitCode == 0, eventData.Workspace, cfg.ArtifactDir)
1132 } else {
1133 payload := StatusPayload{
1134 Timestamp: time.Now().UTC().Format(time.RFC3339),
1135 Name: name,
1136 JobID: jobID,
1137 Status: status,
1138 ExitCode: &exitCode,
1139 Duration: duration,
1140 StartedAt: startedAt,
1141 EndedAt: endedAt,
1142 SessionCount: len(group),
1143 Sessions: group,
1144 }
1145 if err := publishStatus(output, payload); err != nil {
1146 log.Error("publish final status", "err", err)
1147 }
1148 }
1149 // Write published.json, then sync to include it in the rsync.
1150 // This is the only sync for completed jobs. In-progress jobs are
1151 // synced every tick in the else branch below.
1152 published := map[string]interface{}{
1153 "status": status,
1154 "exit_code": exitCode,
1155 "job_id": jobID,
1156 "finished_at": time.Now().UTC().Format(time.RFC3339),
1157 }
1158 publishedJSON, _ := json.Marshal(published)
1159 log.Info("writing sentinel", "path", sentinel)
1160 if err := os.WriteFile(sentinel, publishedJSON, 0644); err != nil {
1161 log.Error("write published sentinel", "err", err)
1162 }
1163 // Regenerate index.html now that published.json exists, so the artifact list includes it.
1164 indexHTML, indexTXT := generateJobIndex(cfg.ArtifactDir, name, jobID, group)
1165 if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexHTML, ".html"); err != nil {
1166 log.Error("stage index.html", "err", err)
1167 }
1168 if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexTXT, ".txt"); err != nil {
1169 log.Error("stage index.txt", "err", err)
1170 }
1171 if err := syncJobArtifacts(cfg, name, jobID, log); err != nil {
1172 log.Error("sync artifacts", "err", err)
1173 }
1174 } else {
1175 log.Debug("job still running", "sessions", len(group))
1176 // Sync in-progress jobs every tick using the sessions we already know about.
1177 if err := syncJobArtifacts(cfg, name, jobID, log); err != nil {
1178 log.Error("sync artifacts", "err", err)
1179 }
1180 // Publish running status only when --include-running is set
1181 if cfg.IncludeRunning {
1182 if cfg.HumanOutput {
1183 renderJobRunning(output, name, jobID, group, duration, jobStates)
1184 } else {
1185 payload := StatusPayload{
1186 Timestamp: time.Now().UTC().Format(time.RFC3339),
1187 Name: name,
1188 JobID: jobID,
1189 Status: "running",
1190 ExitCode: nil,
1191 Duration: duration,
1192 StartedAt: startedAt,
1193 SessionCount: len(group),
1194 Sessions: group,
1195 }
1196 if err := publishStatus(output, payload); err != nil {
1197 log.Error("publish status", "err", err)
1198 }
1199 }
1200 }
1201 }
1202 }
1203
1204 return nil
1205}
1206
1207// loadEvent reads the event.json for a job from the artifact directory.
1208func loadEvent(dir, name, jobID string) (Event, error) {
1209 var event Event
1210 data, err := os.ReadFile(filepath.Join(dir, name, jobID, "artifacts", "event.json"))
1211 if err != nil {
1212 return event, err
1213 }
1214 return event, json.Unmarshal(data, &event)
1215}
1216
1217// computeJobTiming derives started_at, ended_at, and duration from session timestamps.
1218// started_at is the earliest session creation time, ended_at is the latest session end time.
1219// For running jobs, ended_at is empty and duration is computed against now.
1220func computeJobTiming(sessions []SessionInfo) (startedAt, endedAt, duration string) {
1221 var earliestCreated, latestEnded int64
1222 hasCreated, hasEnded := false, false
1223
1224 for _, s := range sessions {
1225 if s.Created != "" {
1226 var c int64
1227 if _, err := fmt.Sscanf(s.Created, "%d", &c); err == nil {
1228 if !hasCreated || c < earliestCreated {
1229 earliestCreated = c
1230 }
1231 hasCreated = true
1232 }
1233 }
1234 if s.Ended != "" {
1235 var e int64
1236 if _, err := fmt.Sscanf(s.Ended, "%d", &e); err == nil {
1237 if !hasEnded || e > latestEnded {
1238 latestEnded = e
1239 }
1240 hasEnded = true
1241 }
1242 }
1243 }
1244
1245 if hasCreated {
1246 startedAt = time.Unix(earliestCreated, 0).UTC().Format(time.RFC3339)
1247 }
1248 if hasEnded {
1249 endedAt = time.Unix(latestEnded, 0).UTC().Format(time.RFC3339)
1250 }
1251
1252 if hasCreated && hasEnded {
1253 duration = fmtDurationTs(earliestCreated, latestEnded)
1254 } else if hasCreated {
1255 duration = fmtDurationTs(earliestCreated, time.Now().Unix())
1256 }
1257
1258 return startedAt, endedAt, duration
1259}
1260
1261// fmtDurationTs formats the duration between two unix timestamps.
1262func fmtDurationTs(started, ended int64) string {
1263 d := time.Duration(ended-started) * time.Second
1264 if d < 0 {
1265 return "—"
1266 }
1267 if d >= time.Minute {
1268 return fmt.Sprintf("%dm%ds", d/time.Minute, d%time.Minute/time.Second)
1269 }
1270 secs := float64(d) / float64(time.Second)
1271 if secs >= 10 {
1272 return fmt.Sprintf("%ds", int(secs))
1273 }
1274 return fmt.Sprintf("%.1fs", secs)
1275}
1276
1277// filterCISessions returns only sessions with ci. prefix.
1278func filterCISessions(sessions []SessionInfo) []SessionInfo {
1279 var filtered []SessionInfo
1280 for _, s := range sessions {
1281 if strings.HasPrefix(s.Name, "ci.") {
1282 filtered = append(filtered, s)
1283 }
1284 }
1285 return filtered
1286}
1287
1288// groupSessionsByJob groups sessions by their job prefix (ci.<name>.<jobID>.).
1289func groupSessionsByJob(sessions []SessionInfo) map[string][]SessionInfo {
1290 groups := make(map[string][]SessionInfo)
1291 for _, s := range sessions {
1292 prefix := extractJobPrefix(s.Name)
1293 if prefix == "" {
1294 continue
1295 }
1296 // Set the Short name
1297 name, jobID := parseJobPrefix(prefix)
1298 s.Short = cleanSessionShort(s.Name, prefix, name, jobID)
1299 groups[prefix] = append(groups[prefix], s)
1300 }
1301 return groups
1302}
1303
1304// parseJobPrefix extracts name and jobID from a prefix like "ci.<name>.<jobID>.".
1305func parseJobPrefix(prefix string) (name, jobID string) {
1306 // ci.name.jobID. -> ["ci", "name", "jobID", ""]
1307 parts := strings.Split(prefix, ".")
1308 if len(parts) < 4 {
1309 return "", ""
1310 }
1311 return parts[1], parts[2]
1312}
1313
1314// resolveJobExitCode determines the job's exit code from its sessions.
1315// Defensive: if any child session failed, the job failed regardless of the
1316// runner's exit code. This protects against bad pico.sh scripts that exit 0
1317// without waiting for children.
1318func resolveJobExitCode(sessions []SessionInfo) (int, string) {
1319 var runnerExit *int
1320 var worstChild *int // highest non-zero child exit code
1321
1322 for _, s := range sessions {
1323 if s.ExitCode == "" {
1324 continue
1325 }
1326 var code int
1327 if _, err := fmt.Sscanf(s.ExitCode, "%d", &code); err != nil {
1328 continue
1329 }
1330
1331 if strings.HasSuffix(s.Name, ".runner") {
1332 runnerExit = &code
1333 } else if code != 0 {
1334 if worstChild == nil || code > *worstChild {
1335 worstChild = &code
1336 }
1337 }
1338 }
1339
1340 // Any child failure overrides the runner — defensive against bad scripts
1341 if worstChild != nil {
1342 return *worstChild, "failed"
1343 }
1344 if runnerExit != nil && *runnerExit != 0 {
1345 return *runnerExit, "failed"
1346 }
1347 return 0, "success"
1348}
1349
1350func generateJobID(name, workspace string) string {
1351 return jobIDFor(name, workspace, time.Now().UnixNano())
1352}
1353
1354func runtimeOS() string {
1355 return runtime.GOOS
1356}
1357
1358func runtimeArch() string {
1359 return runtime.GOARCH
1360}
1361
1362func jobIDFor(name, workspace string, ts int64) string {
1363 h := sha256.Sum256([]byte(name + workspace + fmt.Sprintf("%d", ts)))
1364 return fmt.Sprintf("%x", h[:4])
1365}
1366
1367func parseZMXList(output string) []SessionInfo {
1368 var sessions []SessionInfo
1369 lines := strings.Split(strings.TrimSpace(output), "\n")
1370 for _, line := range lines {
1371 line = strings.TrimSpace(line)
1372 if line == "" {
1373 continue
1374 }
1375 // Strip leading arrow/space prefix
1376 line = strings.TrimPrefix(line, "→ ")
1377 line = strings.TrimSpace(line)
1378
1379 fields := strings.FieldsFunc(line, func(r rune) bool {
1380 return r == '\t'
1381 })
1382
1383 var si SessionInfo
1384 for _, field := range fields {
1385 parts := strings.SplitN(field, "=", 2)
1386 if len(parts) != 2 {
1387 continue
1388 }
1389 switch parts[0] {
1390 case "name":
1391 si.Name = parts[1]
1392 case "pid":
1393 si.PID = parts[1]
1394 case "clients":
1395 si.Clients = parts[1]
1396 case "created":
1397 si.Created = parts[1]
1398 case "start_dir":
1399 si.StartDir = parts[1]
1400 case "ended":
1401 si.Ended = parts[1]
1402 case "exit_code":
1403 si.ExitCode = parts[1]
1404 }
1405 }
1406 if si.Name != "" {
1407 sessions = append(sessions, si)
1408 }
1409 }
1410 return sessions
1411}
1412
1413// loadStyles reads the shared CSS from the embedded template filesystem.
1414func loadStyles() (string, error) {
1415 data, err := fs.ReadFile(tmplFS, "tmpl/styles.css")
1416 if err != nil {
1417 return "", fmt.Errorf("read styles: %w", err)
1418 }
1419 return string(data), nil
1420}
1421
1422// SessionArtifactData holds the data for rendering a session HTML artifact.
1423type SessionArtifactData struct {
1424 SessionName string
1425 SessionShort string
1426 SessionStatus string
1427 JobName string
1428 JobID string
1429 Duration string
1430 ExitCode string
1431 Content template.HTML
1432}
1433
1434// fetchHistoryHTML fetches session history from zmx and wraps it in a full HTML document.
1435func fetchHistoryHTML(sessionName, jobName, jobID, status, duration, exitCode string) (string, error) {
1436 cmd := exec.Command("zmx", "history", sessionName, "--html")
1437 output, err := cmd.Output()
1438 if err != nil {
1439 return "", err
1440 }
1441
1442 prefix := "ci." + jobName + "." + jobID + "."
1443 shortName := strings.TrimPrefix(sessionName, prefix)
1444 shortName = strings.TrimPrefix(shortName, "step.")
1445
1446 data := SessionArtifactData{
1447 SessionName: sessionName,
1448 SessionShort: shortName,
1449 SessionStatus: status,
1450 JobName: jobName,
1451 JobID: jobID,
1452 Duration: duration,
1453 ExitCode: exitCode,
1454 Content: template.HTML(string(output)),
1455 }
1456
1457 tmpl, err := template.ParseFS(tmplFS, "tmpl/session.html")
1458 if err != nil {
1459 return "", fmt.Errorf("parse template: %w", err)
1460 }
1461
1462 var buf strings.Builder
1463 if err := tmpl.ExecuteTemplate(&buf, "session.html", data); err != nil {
1464 return "", fmt.Errorf("execute template: %w", err)
1465 }
1466
1467 return buf.String(), nil
1468}
1469
1470func fetchHistoryPlain(sessionName string) (string, error) {
1471 cmd := exec.Command("zmx", "history", sessionName, "--plain")
1472 output, err := cmd.Output()
1473 if err != nil {
1474 return "", err
1475 }
1476 return string(output), nil
1477}
1478
1479func publishStatus(w io.Writer, payload StatusPayload) error {
1480 data, err := json.Marshal(payload)
1481 if err != nil {
1482 return err
1483 }
1484 _, err = w.Write(append(data, '\n'))
1485 return err
1486}
1487
1488func stageArtifact(dir, name, jobID, short, content, ext string) error {
1489 path := filepath.Join(dir, name, jobID, short+ext)
1490 if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
1491 return err
1492 }
1493 return os.WriteFile(path, []byte(content), 0644)
1494}
1495
1496// syncJobArtifacts syncs artifacts for a single job.
1497func syncJobArtifacts(cfg *Cfg, repoName, jobID string, log *slog.Logger) error {
1498 jobDir := filepath.Join(cfg.ArtifactDir, repoName, jobID)
1499 eventPath := filepath.Join(jobDir, "artifacts", "event.json")
1500 data, err := os.ReadFile(eventPath)
1501 if err != nil {
1502 if os.IsNotExist(err) {
1503 log.Debug("event.json not found, skipping sync", "repo", repoName, "job_id", jobID)
1504 return nil
1505 }
1506 return fmt.Errorf("read event: %w", err)
1507 }
1508 var event Event
1509 if err := json.Unmarshal(data, &event); err != nil || event.ArtifactDest == "" {
1510 return fmt.Errorf("invalid event")
1511 }
1512 log.Debug("syncing artifacts", "repo", repoName, "job_id", jobID, "dest", event.ArtifactDest)
1513 var sshArgs string
1514 if cfg.KeyLocation != "" {
1515 certFile := ""
1516 if cfg.CertificateLocation != "" {
1517 certFile = fmt.Sprintf(" -o CertificateFile %s", cfg.CertificateLocation)
1518 }
1519 sshArgs = fmt.Sprintf("-F ~/.ssh/config -i %s%s", cfg.KeyLocation, certFile)
1520 }
1521 // Append "/" so rsync copies into the destination directory,
1522 // not as a subdirectory named after the source.
1523 dest := event.ArtifactDest
1524 if !strings.HasSuffix(dest, "/") {
1525 dest += "/"
1526 }
1527 var cmd *exec.Cmd
1528 if sshArgs != "" {
1529 cmd = exec.Command("rsync", "-e", sshArgs, "-rv", jobDir+"/", dest)
1530 } else {
1531 cmd = exec.Command("rsync", "-rv", jobDir+"/", dest)
1532 }
1533 rsyncCmd := fmt.Sprintf("rsync %s %s %s",
1534 strings.TrimLeft(cmd.Args[1], "-"),
1535 jobDir+"/", dest)
1536 log.Info("rsync", "cmd", rsyncCmd)
1537 return runCmd(cmd, log)
1538}
1539
1540func allCompleted(sessions []SessionInfo) bool {
1541 for _, s := range sessions {
1542 if s.Ended == "" {
1543 return false
1544 }
1545 }
1546 return true
1547}
1548
1549func runCmd(cmd *exec.Cmd, log *slog.Logger) error {
1550 stdout, err := cmd.StdoutPipe()
1551 if err != nil {
1552 return err
1553 }
1554
1555 stderr, err := cmd.StderrPipe()
1556 if err != nil {
1557 return err
1558 }
1559
1560 if err := cmd.Start(); err != nil {
1561 return err
1562 }
1563
1564 go func() {
1565 scanner := bufio.NewScanner(stdout)
1566 for scanner.Scan() {
1567 log.Debug("cmd stdout", "text", scanner.Text())
1568 }
1569 }()
1570
1571 go func() {
1572 scanner := bufio.NewScanner(stderr)
1573 for scanner.Scan() {
1574 log.Error("cmd stderr", "text", scanner.Text())
1575 }
1576 }()
1577
1578 return cmd.Wait()
1579}
1580
1581// runCancel reads an event from stdin and cancels any running job with matching name+type.
1582func runCancel(cfg *Cfg) error {
1583 log := cfg.Logger.With("cmd", "cancel")
1584
1585 // Read event from stdin
1586 scanner := bufio.NewScanner(os.Stdin)
1587 if !scanner.Scan() {
1588 return fmt.Errorf("no input on stdin")
1589 }
1590
1591 var event Event
1592 if err := json.Unmarshal([]byte(scanner.Text()), &event); err != nil {
1593 return fmt.Errorf("unmarshal event: %w", err)
1594 }
1595
1596 log = log.With("repo", event.Name, "type", event.Type)
1597 log.Info("cancelling running jobs for repo")
1598
1599 cancelRunningJobs(cfg, log, event.Name)
1600 return nil
1601}
1602
1603// cancelRunningJobs finds and cancels all running jobs for a given repo name.
1604// It kills the runner sessions (which cascades to children). The monitor will
1605// detect the ended sessions and publish cancelled status on its next tick.
1606func cancelRunningJobs(cfg *Cfg, log *slog.Logger, name string) {
1607 runnerSessions, _ := findRunningJobs(name)
1608 if len(runnerSessions) == 0 {
1609 log.Debug("no running jobs to cancel")
1610 return
1611 }
1612
1613 log.Debug("found running jobs to cancel", "count", len(runnerSessions))
1614
1615 for _, runnerName := range runnerSessions {
1616 jobID := extractJobID(runnerName)
1617 log := log.With("job_id", jobID)
1618
1619 log.Debug("cancelling job", "runner", runnerName)
1620 if err := killSessions([]string{runnerName}); err != nil {
1621 log.Error("kill runner session", "err", err)
1622 continue
1623 }
1624 log.Debug("cancelled runner session")
1625 }
1626}
1627
1628// findRunningJobs finds all active runner sessions for a given name.
1629// Returns runner session names and all sessions for reference.
1630func findRunningJobs(name string) ([]string, []SessionInfo) {
1631 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
1632 if err != nil {
1633 return nil, nil
1634 }
1635
1636 sessions := parseZMXList(string(listOutput))
1637 var runners []string
1638 for _, s := range sessions {
1639 // Match ci.<name>.*.runner sessions that are still active (no ended)
1640 if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") && s.Ended == "" {
1641 runners = append(runners, s.Name)
1642 }
1643 }
1644 return runners, sessions
1645}
1646
1647// extractJobID extracts the jobID from a runner session name like ci.<name>.<jobID>.runner.
1648func extractJobID(runnerName string) string {
1649 // ci.<name>.<jobID>.runner
1650 // Remove "ci." prefix and ".runner" suffix, then take the part after the first "."
1651 name := strings.TrimSuffix(runnerName, ".runner")
1652 name = strings.TrimPrefix(name, "ci.")
1653 // name is now <name>.<jobID>, take the jobID part
1654 parts := strings.SplitN(name, ".", 2)
1655 if len(parts) == 2 {
1656 return parts[1]
1657 }
1658 return ""
1659}
1660
1661// killSessions kills zmx sessions by name.
1662func killSessions(names []string) error {
1663 if len(names) == 0 {
1664 return nil
1665 }
1666 args := append([]string{"kill"}, names...)
1667 cmd := exec.Command("zmx", args...)
1668 output, err := cmd.CombinedOutput()
1669 if err != nil {
1670 return fmt.Errorf("zmx kill: %s: %w", string(output), err)
1671 }
1672 return nil
1673}
1674
1675// runGC kills all ci. zmx sessions older than 3 hours, regardless of status.
1676func runGC(cfg *Cfg) error {
1677 log := cfg.Logger.With("cmd", "gc")
1678 log.Debug("running garbage collection")
1679
1680 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
1681 if err != nil {
1682 return fmt.Errorf("zmx list: %w", err)
1683 }
1684
1685 sessions := parseZMXList(string(listOutput))
1686
1687 cutoff := time.Now().Add(-3 * time.Hour).Unix()
1688
1689 var toKill []string
1690 for _, s := range sessions {
1691 if !strings.HasPrefix(s.Name, "ci.") {
1692 continue
1693 }
1694
1695 if s.Created == "" {
1696 continue // skip sessions with no creation time
1697 }
1698
1699 var created int64
1700 if _, err := fmt.Sscanf(s.Created, "%d", &created); err != nil {
1701 continue
1702 }
1703
1704 if created < cutoff {
1705 log.Debug("session expired, scheduling for gc", "session", s.Name, "created", s.Created)
1706 toKill = append(toKill, s.Name)
1707 }
1708 }
1709
1710 if len(toKill) == 0 {
1711 log.Debug("no sessions to garbage collect")
1712 return nil
1713 }
1714
1715 if err := killSessions(toKill); err != nil {
1716 return fmt.Errorf("kill sessions: %w", err)
1717 }
1718
1719 log.Debug("garbage collection complete", "killed", len(toKill))
1720 return nil
1721}
1722
1723// extractJobPrefix extracts the job prefix from a session name.
1724// ci.<name>.<jobID>.<step> -> ci.<name>.<jobID>.
1725func extractJobPrefix(sessionName string) string {
1726 // Extract the job key ci.<name>.<jobID> from session names:
1727 // ci.name.jobID.runner (4 parts) → ci.name.jobID.
1728 // ci.name.jobID.step.lint (5 parts) → ci.name.jobID.
1729 parts := strings.Split(sessionName, ".")
1730 if len(parts) < 4 {
1731 return ""
1732 }
1733 return parts[0] + "." + parts[1] + "." + parts[2] + "."
1734}
1735
1736// sessionRow holds display info for a single session in the job index.
1737type sessionRow struct {
1738 Name string
1739 Short string
1740 Status string
1741 ExitCode string
1742 Started string
1743 Ended string
1744 Duration string
1745}
1746
1747// artifactRow holds display info for non-session artifacts (e.g., event.json, workspace.tar).
1748type artifactRow struct {
1749 Name string
1750 Size string
1751 ModTime string
1752}
1753
1754// formatFileSize returns a human-readable file size string.
1755func formatFileSize(bytes int64) string {
1756 const unit = 1024
1757 if bytes < unit {
1758 return fmt.Sprintf("%d B", bytes)
1759 }
1760 div, exp := int64(unit), 0
1761 for n := bytes / unit; n >= unit; n /= unit {
1762 div *= unit
1763 exp++
1764 }
1765 units := []string{"KB", "MB", "GB", "TB"}
1766 return fmt.Sprintf("%.1f %s", float64(bytes)/float64(div), units[exp])
1767}
1768
1769// formatTimestamp converts a unix timestamp to a human-readable format.
1770func formatTimestamp(ts string) string {
1771 if ts == "" {
1772 return "—"
1773 }
1774 var t int64
1775 if _, err := fmt.Sscanf(ts, "%d", &t); err != nil {
1776 return "—"
1777 }
1778 return time.Unix(t, 0).UTC().Format("2006-01-02 15:04:05")
1779}
1780
1781// generateJobIndex produces HTML and plain-text index pages listing all
1782// sessions for a job with links and metadata (status, exit code, ended at).
1783// It also includes any other artifacts in the job directory (e.g., event.json, attestation.json, workspace.tar).
1784func generateJobIndex(artifactDir, name, jobID string, sessions []SessionInfo) (htmlContent, txtContent string) {
1785 rows := make([]sessionRow, 0, len(sessions))
1786 for _, s := range sessions {
1787 row := sessionRow{
1788 Name: s.Name,
1789 Short: s.Short,
1790 Started: s.Created,
1791 }
1792 if s.Ended == "" {
1793 row.Status = "running"
1794 row.Duration = fmtDuration(s.Created, fmt.Sprintf("%d", time.Now().Unix()))
1795 } else if s.ExitCode == "0" {
1796 row.Status = "success"
1797 row.ExitCode = "0"
1798 } else {
1799 row.Status = "failed"
1800 row.ExitCode = s.ExitCode
1801 }
1802 row.Ended = s.Ended
1803 row.Duration = fmtDuration(s.Created, s.Ended)
1804 rows = append(rows, row)
1805 }
1806
1807 // Gather artifacts by scanning the artifacts/ subfolder
1808 var artifacts []artifactRow
1809 artifactsDir := filepath.Join(artifactDir, name, jobID, "artifacts")
1810
1811 if entries, err := os.ReadDir(artifactsDir); err == nil {
1812 for _, entry := range entries {
1813 if entry.IsDir() {
1814 continue
1815 }
1816 info, err := entry.Info()
1817 if err != nil {
1818 continue
1819 }
1820 artifacts = append(artifacts, artifactRow{
1821 Name: entry.Name(),
1822 Size: formatFileSize(info.Size()),
1823 ModTime: formatTimestamp(fmt.Sprintf("%d", info.ModTime().Unix())),
1824 })
1825 }
1826 }
1827
1828 // Resolve overall job status
1829 jobStatus := "success"
1830 hasRunning := false
1831 for _, r := range rows {
1832 if r.Status == "running" {
1833 hasRunning = true
1834 break
1835 }
1836 if r.Status == "failed" {
1837 jobStatus = "failed"
1838 }
1839 }
1840 if hasRunning {
1841 jobStatus = "running"
1842 }
1843
1844 // HTML index
1845 tmpl, err := template.New("index").Funcs(template.FuncMap{
1846 "formatTimestamp": formatTimestamp,
1847 }).ParseFS(tmplFS, "tmpl/index.html")
1848 if err == nil {
1849 var buf strings.Builder
1850 if err := tmpl.ExecuteTemplate(&buf, "index.html", struct {
1851 Name string
1852 JobID string
1853 JobStatus string
1854 Rows []sessionRow
1855 Artifacts []artifactRow
1856 }{Name: name, JobID: jobID, JobStatus: jobStatus, Rows: rows, Artifacts: artifacts}); err == nil {
1857 htmlContent = buf.String()
1858 }
1859 }
1860
1861 // Plain-text index
1862 var buf strings.Builder
1863 statusIcon := map[string]string{"success": "\u2705", "failed": "\u274c", "running": "\u23f3"}
1864 icon := statusIcon[jobStatus]
1865 fmt.Fprintf(&buf, "Job: %s (%s) %s\n", name, jobID, icon)
1866 fmt.Fprintf(&buf, "Sessions: %d\n", len(rows))
1867 fmt.Fprintln(&buf, strings.Repeat("-", 70))
1868 for _, r := range rows {
1869 rIcon := statusIcon[r.Status]
1870 fmt.Fprintf(&buf, " %s %-20s exit: %-5s duration: %-8s ended: %s\n",
1871 rIcon, r.Short, r.ExitCode, r.Duration, r.Ended)
1872 }
1873 fmt.Fprintln(&buf, strings.Repeat("-", 60))
1874 txtContent = buf.String()
1875
1876 return htmlContent, txtContent
1877}
1878
1879// fmtDuration formats the duration between two unix timestamp strings.
1880// Returns human-readable strings like "2m34s", "1.5s", or "—" if invalid.
1881func fmtDuration(created, ended string) string {
1882 if created == "" || ended == "" {
1883 return "—"
1884 }
1885 var c, e int64
1886 if _, err := fmt.Sscanf(created, "%d", &c); err != nil {
1887 return "—"
1888 }
1889 if _, err := fmt.Sscanf(ended, "%d", &e); err != nil {
1890 return "—"
1891 }
1892 duration := time.Duration(e-c) * time.Second
1893 if duration < 0 {
1894 return "—"
1895 }
1896 if duration >= time.Minute {
1897 return fmt.Sprintf("%dm%ds", duration/time.Minute, duration%time.Minute/time.Second)
1898 }
1899 secs := float64(duration) / float64(time.Second)
1900 if secs >= 10 {
1901 return fmt.Sprintf("%ds", int(secs))
1902 }
1903 return fmt.Sprintf("%.1fs", secs)
1904}