Commit b95ae48
Eric Bower
·
2026-05-06 20:18:57 -0400 EDT
parent b95ae48
feat: init
A
go.mod
+5,
-0
1@@ -0,0 +1,5 @@
2+module github.com/picosh/ci
3+
4+go 1.26.2
5+
6+require github.com/golang-cz/devslog v0.0.15
A
go.sum
+2,
-0
1@@ -0,0 +1,2 @@
2+github.com/golang-cz/devslog v0.0.15 h1:ejoBLTCwJHWGbAmDf2fyTJJQO3AkzcPjw8SC9LaOQMI=
3+github.com/golang-cz/devslog v0.0.15/go.mod h1:bSe5bm0A7Nyfqtijf1OMNgVJHlWEuVSXnkuASiE1vV8=
A
main.go
+1171,
-0
1@@ -0,0 +1,1171 @@
2+package main
3+
4+import (
5+ "bufio"
6+ "context"
7+ "crypto/sha256"
8+ "encoding/json"
9+ "flag"
10+ "fmt"
11+ "html/template"
12+ "io"
13+ "log/slog"
14+ "os"
15+ "os/exec"
16+ "path/filepath"
17+ "strings"
18+ "time"
19+
20+ "github.com/golang-cz/devslog"
21+)
22+
23+type WorkspaceFactory func(cfg *Cfg, logger *slog.Logger, source string) Workspace
24+
25+func defaultWorkspaceFactory(cfg *Cfg, logger *slog.Logger, source string) Workspace {
26+ return &WorkspaceRsync{
27+ Cfg: cfg,
28+ Logger: logger,
29+ Source: source,
30+ }
31+}
32+
33+type Cfg struct {
34+ Logger *slog.Logger
35+ Ctx context.Context
36+ Cancel context.CancelFunc
37+ KeyLocation string
38+ CertificateLocation string
39+ ArtifactDir string
40+ Event string // event JSON passed via --event flag
41+ EventSource io.ReadCloser // when set, used directly as the event source (for testing)
42+ MonitorInterval time.Duration
43+ NewWorkspace WorkspaceFactory
44+ StatusOutput io.Writer // where status JSONL is written (default: os.Stdout)
45+ StatusFilter string // "terminal" (default) or "all"
46+}
47+
48+type Event struct {
49+ Type string `json:"type"`
50+ Name string `json:"name"`
51+ Workspace string `json:"workspace"`
52+ ArtifactDest string `json:"artifact_dest"`
53+ ArtifactURL string `json:"artifact_url"` // public URL for artifacts (e.g. https://{user}-artifacts.pgs.sh/{repo})
54+}
55+
56+func NewCfg() *Cfg {
57+ var keyLoc, certLoc, artifactDir, event string
58+ var monitorInterval time.Duration
59+ var logLevel string
60+ var structured bool
61+ flag.StringVar(&keyLoc, "pk", "", "ssh private key used to authenticate with pico services")
62+ flag.StringVar(&certLoc, "ck", "", "ssh certificate public key used to authenticate with pico services (only required if using ssh certificates)")
63+ flag.StringVar(&artifactDir, "artifact-dir", "/tmp/pico-ci-artifacts", "local directory to stage artifacts")
64+ flag.StringVar(&event, "event", "", "event JSON to run (alternative to reading from stdin)")
65+ flag.DurationVar(&monitorInterval, "monitor-interval", 5*time.Second, "interval for monitoring zmx sessions")
66+ flag.StringVar(&logLevel, "log-level", "info", "log level: debug, info, warn, error")
67+ flag.BoolVar(&structured, "structured", false, "use structured key=value log output")
68+ var statusFilter string
69+ flag.StringVar(&statusFilter, "status-filter", "terminal", "status output filter: terminal (default) or all")
70+ flag.Parse()
71+
72+ logger := newLogger("ci", logLevel, structured)
73+ ctx, cancel := context.WithCancel(context.Background())
74+ return &Cfg{
75+ NewWorkspace: defaultWorkspaceFactory,
76+ Logger: logger.With("key_loc", keyLoc, "cert_loc", certLoc),
77+ Ctx: ctx,
78+ Cancel: cancel,
79+ KeyLocation: keyLoc,
80+ CertificateLocation: certLoc,
81+ ArtifactDir: artifactDir,
82+ Event: event,
83+ MonitorInterval: monitorInterval,
84+ StatusFilter: statusFilter,
85+ }
86+}
87+
88+func parseLogLevel(s string) slog.Level {
89+ switch strings.ToLower(s) {
90+ case "debug":
91+ return slog.LevelDebug
92+ case "warn":
93+ return slog.LevelWarn
94+ case "error":
95+ return slog.LevelError
96+ default:
97+ return slog.LevelInfo
98+ }
99+}
100+
101+func newLogger(space string, levelStr string, structured bool) *slog.Logger {
102+ lvl := parseLogLevel(levelStr)
103+ if structured {
104+ return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
105+ Level: lvl,
106+ })).With("service", space)
107+ }
108+ return slog.New(devslog.NewHandler(os.Stderr, &devslog.Options{
109+ HandlerOptions: &slog.HandlerOptions{
110+ Level: lvl,
111+ },
112+ })).With("service", space)
113+}
114+
115+func main() {
116+ cfg := NewCfg()
117+ cmd := flag.Arg(0)
118+
119+ cfg.Logger.Debug("setting up ci", "cfg", cfg)
120+ cfg.Logger.Debug("running cmd", "cmd", cmd)
121+
122+ switch cmd {
123+ case "runner":
124+ cfg.Logger.Debug("starting runner")
125+ if err := RunRunner(cfg); err != nil {
126+ cfg.Logger.Error("runner failed", "err", err)
127+ os.Exit(1)
128+ }
129+ case "cancel":
130+ cfg.Logger.Debug("starting cancel handler")
131+ if err := runCancel(cfg); err != nil {
132+ cfg.Logger.Error("cancel failed", "err", err)
133+ os.Exit(1)
134+ }
135+ case "gc":
136+ cfg.Logger.Debug("starting garbage collection")
137+ if err := runGC(cfg); err != nil {
138+ cfg.Logger.Error("gc failed", "err", err)
139+ os.Exit(1)
140+ }
141+ case "monitor":
142+ cfg.Logger.Debug("starting monitor")
143+ if err := runMonitor(cfg); err != nil {
144+ cfg.Logger.Error("monitor failed", "err", err)
145+ os.Exit(1)
146+ }
147+ case "status":
148+ cfg.Logger.Debug("starting status updater")
149+ case "orca":
150+ cfg.Logger.Debug("starting orchestrator")
151+ default:
152+ cfg.Logger.Error("must provide command: runner, cancel, gc, monitor, status, or orca")
153+ os.Exit(1)
154+ }
155+}
156+
157+func RunRunner(cfg *Cfg) error {
158+ var payload string
159+ if cfg.EventSource != nil {
160+ data, err := io.ReadAll(cfg.EventSource)
161+ if err != nil {
162+ return fmt.Errorf("read event source: %w", err)
163+ }
164+ payload = strings.TrimSpace(string(data))
165+ } else if cfg.Event != "" {
166+ payload = cfg.Event
167+ } else {
168+ data, err := io.ReadAll(os.Stdin)
169+ if err != nil {
170+ return fmt.Errorf("read stdin: %w", err)
171+ }
172+ payload = strings.TrimSpace(string(data))
173+ }
174+
175+ var eventData Event
176+ if err := json.Unmarshal([]byte(payload), &eventData); err != nil {
177+ return fmt.Errorf("unmarshal event: %w", err)
178+ }
179+
180+ cfg.Logger.Info("received event", "type", eventData.Type, "repo", eventData.Name, "workspace", eventData.Workspace)
181+
182+ return eventHandler(cfg, &eventData)
183+}
184+
185+type Workspace interface {
186+ Setup() error
187+ Cleanup() error
188+ GetDir() string
189+}
190+
191+type WorkspaceRsync struct {
192+ Cfg *Cfg
193+ Logger *slog.Logger
194+ Source string
195+ Dest string
196+}
197+
198+func (w *WorkspaceRsync) Setup() error {
199+ tempDir, err := os.MkdirTemp("", "pico-ci-*")
200+ if err != nil {
201+ return err
202+ }
203+ w.Dest = tempDir
204+
205+ log := w.Logger.With("source", w.Source, "dest", w.Dest)
206+ log.Debug("syncing workspace via rsync")
207+
208+ var cmd *exec.Cmd
209+ if w.Cfg.KeyLocation != "" {
210+ sshcmd := fmt.Sprintf(
211+ "-i %s -o IdentitiesOnly=yes -o CertificateFile %s",
212+ w.Cfg.KeyLocation,
213+ w.Cfg.CertificateLocation,
214+ )
215+ cmd = exec.Command("rsync", "-e", sshcmd, "-rv", `--exclude="/.git"`, w.Source+"/", w.Dest+"/")
216+ } else {
217+ cmd = exec.Command("rsync", "-rv", `--exclude="/.git"`, w.Source+"/", w.Dest+"/")
218+ }
219+ return runCmd(cmd, log)
220+}
221+
222+func (w *WorkspaceRsync) Cleanup() error {
223+ // return os.RemoveAll(w.Dest)
224+ return nil
225+}
226+
227+func (w *WorkspaceRsync) GetDir() string {
228+ return w.Dest
229+}
230+
231+type JobEngine struct {
232+ Wk Workspace
233+ Logger *slog.Logger
234+ Cfg *Cfg
235+ Ev *Event
236+ JobID string
237+}
238+
239+type SessionInfo struct {
240+ Name string `json:"name"`
241+ Short string `json:"short"`
242+ PID string `json:"pid"`
243+ Clients string `json:"clients"`
244+ Created string `json:"created"`
245+ StartDir string `json:"start_dir"`
246+ Ended string `json:"ended"`
247+ ExitCode string `json:"exit_code"`
248+}
249+
250+type StatusPayload struct {
251+ Name string `json:"name"`
252+ JobID string `json:"job_id"`
253+ Status string `json:"status"`
254+ ExitCode *int `json:"exit_code"`
255+ Duration string `json:"duration,omitempty"`
256+ StartedAt string `json:"started_at,omitempty"`
257+ EndedAt string `json:"ended_at,omitempty"`
258+ ArtifactURL string `json:"artifact_url,omitempty"`
259+ SessionCount int `json:"session_count"`
260+ Sessions []SessionInfo `json:"sessions"`
261+}
262+
263+func (eng *JobEngine) Setup() error {
264+ return eng.Wk.Setup()
265+}
266+
267+func (eng *JobEngine) Run() error {
268+ manifest, err := eng.getManifest()
269+ if err != nil {
270+ return err
271+ }
272+
273+ prefix := fmt.Sprintf("ci.%s.%s.", eng.Ev.Name, eng.JobID)
274+
275+ log := eng.Logger.With("manifest", manifest, "prefix", prefix)
276+ log.Debug("starting runner session", "session", prefix+"runner")
277+
278+ evStr := fmt.Sprintf("PICO_CI_EVENT=%s", eng.Ev.Type)
279+ jobStr := fmt.Sprintf("ZMX_SESSION_PREFIX=%s", prefix)
280+
281+ cmd := exec.Command("zmx", "run", "runner", "-d", "bash", manifest)
282+ cmd.Env = append(os.Environ(), evStr, jobStr)
283+ cmd.Dir = eng.Wk.GetDir()
284+
285+ if err := cmd.Run(); err != nil {
286+ return fmt.Errorf("start runner session: %w", err)
287+ }
288+
289+ return nil
290+}
291+
292+func (eng *JobEngine) Cleanup() error {
293+ return eng.Wk.Cleanup()
294+}
295+
296+func (eng *JobEngine) getManifest() (string, error) {
297+ fnames := []string{"pico.sh"}
298+ for _, manifest := range fnames {
299+ path := filepath.Join(eng.Wk.GetDir(), manifest)
300+ if _, err := os.Stat(path); err == nil {
301+ return path, nil
302+ }
303+ }
304+ return "", fmt.Errorf("manifest not found in %s", eng.Wk.GetDir())
305+}
306+
307+func eventHandler(cfg *Cfg, eventData *Event) error {
308+ log := cfg.Logger.With("repo", eventData.Name, "type", eventData.Type)
309+ log.Info("processing event", "workspace", eventData.Workspace)
310+
311+ // Cancel any existing job for this repo before starting a new one
312+ cancelRunningJobs(cfg, log, eventData.Name)
313+
314+ jobID := generateJobID(eventData.Name, eventData.Workspace)
315+ log = log.With("job_id", jobID)
316+ log.Info("starting job", "session_prefix", fmt.Sprintf("ci.%s.%s", eventData.Name, jobID))
317+
318+ wk := cfg.NewWorkspace(cfg, log, eventData.Workspace)
319+ eng := &JobEngine{
320+ Logger: log,
321+ Cfg: cfg,
322+ Wk: wk,
323+ Ev: eventData,
324+ JobID: jobID,
325+ }
326+ defer func() {
327+ if err := eng.Cleanup(); err != nil {
328+ cfg.Logger.Error("engine cleanup", "err", err)
329+ }
330+ }()
331+
332+ log.Info("cloning workspace", "source", eventData.Workspace)
333+ if err := eng.Setup(); err != nil {
334+ return fmt.Errorf("setup: %w", err)
335+ }
336+ log.Info("workspace ready", "dir", eng.Wk.GetDir())
337+
338+ // Store the event in the artifact directory so the monitor can access it
339+ eventBytes, _ := json.Marshal(eventData)
340+ eventDir := filepath.Join(cfg.ArtifactDir, eventData.Name, jobID)
341+ if err := os.MkdirAll(eventDir, 0755); err != nil {
342+ log.Error("create event dir", "err", err)
343+ } else {
344+ if err := os.WriteFile(filepath.Join(eventDir, "event.json"), eventBytes, 0644); err != nil {
345+ log.Error("write event", "err", err)
346+ }
347+ }
348+
349+ log.Info("launching job sessions")
350+ if err := eng.Run(); err != nil {
351+ return fmt.Errorf("run: %w", err)
352+ }
353+
354+ log.Info("job launched successfully — monitor will track progress", "artifact_dir", cfg.ArtifactDir, "artifact_dest", eventData.ArtifactDest)
355+ log.Info("follow the runner live", "command", fmt.Sprintf("zmx tail ci.%s.%s.runner", eventData.Name, jobID))
356+ return nil
357+}
358+
359+// runMonitor is a long-lived daemon that polls all ci.* zmx sessions,
360+// writes status as JSONL to stdout, stages artifacts, and syncs to destination.
361+// Logs go to stderr. Compose with shell tools to route status:
362+//
363+// pico-ci monitor | ssh pipe.pico.sh "pub build.status -b=false"
364+// pico-ci monitor | while read -r line; do curl -sd"$line" $WEBHOOK; done
365+// pico-ci monitor > status.jsonl
366+func runMonitor(cfg *Cfg) error {
367+ log := cfg.Logger.With("cmd", "monitor")
368+
369+ output := cfg.StatusOutput
370+ if output == nil {
371+ output = os.Stdout
372+ }
373+
374+ ticker := time.NewTicker(cfg.MonitorInterval)
375+ defer ticker.Stop()
376+
377+ log.Debug("monitor started", "interval", cfg.MonitorInterval, "artifact_dir", cfg.ArtifactDir)
378+ log.Debug("monitoring ci.* sessions for job status, writing status to stdout")
379+
380+ for {
381+ select {
382+ case <-cfg.Ctx.Done():
383+ log.Debug("context cancelled, stopping monitor")
384+ return cfg.Ctx.Err()
385+ case <-ticker.C:
386+ }
387+
388+ if err := monitorTick(cfg, log, output); err != nil {
389+ log.Error("monitor tick", "err", err)
390+ }
391+ }
392+}
393+
394+// monitorTick performs a single monitoring pass over all ci.* sessions.
395+func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer) error {
396+ // a. zmx list → filter ci.* sessions
397+ listOutput, err := exec.Command("zmx", "list").CombinedOutput()
398+ if err != nil {
399+ return fmt.Errorf("zmx list: %w", err)
400+ }
401+ sessions := parseZMXList(string(listOutput))
402+ ciSessions := filterCISessions(sessions)
403+
404+ if len(ciSessions) == 0 {
405+ log.Debug("no ci.* sessions found")
406+ return nil
407+ }
408+
409+ log.Debug("found ci sessions", "count", len(ciSessions))
410+
411+ // b. Group by job prefix: ci.<name>.<jobID>.
412+ groups := groupSessionsByJob(ciSessions)
413+
414+ // c. Process each job group
415+ for prefix, group := range groups {
416+ name, jobID := parseJobPrefix(prefix)
417+ if name == "" {
418+ continue
419+ }
420+
421+ log := log.With("repo", name, "job_id", jobID)
422+
423+ // Fetch and stage history for every session at each tick,
424+ // not just when the job completes. This gives live progress
425+ // snapshots while the job is running.
426+ for _, s := range group {
427+ html, err := fetchHistoryHTML(s.Name)
428+ if err != nil {
429+ log.Error("fetch history html", "session", s.Name, "err", err)
430+ continue
431+ }
432+ if err := stageArtifact(cfg.ArtifactDir, name, jobID, s.Short, html, ".html"); err != nil {
433+ log.Error("stage html artifact", "session", s.Short, "err", err)
434+ }
435+
436+ plain, err := fetchHistoryPlain(s.Name)
437+ if err != nil {
438+ log.Error("fetch history plain", "session", s.Name, "err", err)
439+ continue
440+ }
441+ if err := stageArtifact(cfg.ArtifactDir, name, jobID, s.Short, plain, ".txt"); err != nil {
442+ log.Error("stage txt artifact", "session", s.Short, "err", err)
443+ }
444+ }
445+
446+ // Generate and stage job index landing pages
447+ indexHTML, indexTXT := generateJobIndex(name, jobID, group)
448+ if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexHTML, ".html"); err != nil {
449+ log.Error("stage index.html", "err", err)
450+ }
451+ if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexTXT, ".txt"); err != nil {
452+ log.Error("stage index.txt", "err", err)
453+ }
454+
455+ // Load event to get artifact URL
456+ eventData, _ := loadEvent(cfg.ArtifactDir, name, jobID)
457+
458+ // Build artifact URL: artifact_url/jobID/index.html
459+ var artifactURL string
460+ if eventData.ArtifactURL != "" {
461+ artifactURL = fmt.Sprintf("%s/%s/index.html", strings.TrimRight(eventData.ArtifactURL, "/"), jobID)
462+ }
463+
464+ // Compute job-level timing from session timestamps
465+ startedAt, endedAt, duration := computeJobTiming(group)
466+
467+ if allCompleted(group) {
468+ // Check sentinel — publish terminal status exactly once
469+ sentinel := filepath.Join(cfg.ArtifactDir, name, jobID, "published.json")
470+ if _, err := os.Stat(sentinel); err == nil {
471+ log.Debug("terminal status already published, skipping", "sentinel", sentinel)
472+ continue
473+ }
474+
475+ log.Debug("job completed, publishing final status", "sessions", len(group))
476+ exitCode, status := resolveJobExitCode(group)
477+ log.Info("job finished", "status", status, "exit_code", exitCode)
478+ payload := StatusPayload{
479+ Name: name,
480+ JobID: jobID,
481+ Status: status,
482+ ExitCode: &exitCode,
483+ Duration: duration,
484+ StartedAt: startedAt,
485+ EndedAt: endedAt,
486+ ArtifactURL: artifactURL,
487+ SessionCount: len(group),
488+ Sessions: group,
489+ }
490+ if err := publishStatus(output, payload); err != nil {
491+ log.Error("publish final status", "err", err)
492+ }
493+ // Write sentinel so we don't re-publish on subsequent ticks
494+ if err := os.WriteFile(sentinel, []byte(status), 0644); err != nil {
495+ log.Error("write published sentinel", "err", err)
496+ }
497+ } else {
498+ log.Debug("job still running", "sessions", len(group))
499+ // Publish running status only when filter is "all"
500+ if cfg.StatusFilter != "terminal" {
501+ payload := StatusPayload{
502+ Name: name,
503+ JobID: jobID,
504+ Status: "running",
505+ ExitCode: nil,
506+ Duration: duration,
507+ StartedAt: startedAt,
508+ ArtifactURL: artifactURL,
509+ SessionCount: len(group),
510+ Sessions: group,
511+ }
512+ if err := publishStatus(output, payload); err != nil {
513+ log.Error("publish status", "err", err)
514+ }
515+ }
516+ }
517+ }
518+
519+ // d. Sync artifacts once per tick
520+ if err := syncArtifacts(cfg, log); err != nil {
521+ log.Error("sync artifacts", "err", err)
522+ }
523+
524+ return nil
525+}
526+
527+// loadEvent reads the event.json for a job from the artifact directory.
528+func loadEvent(dir, name, jobID string) (Event, error) {
529+ var event Event
530+ data, err := os.ReadFile(filepath.Join(dir, name, jobID, "event.json"))
531+ if err != nil {
532+ return event, err
533+ }
534+ return event, json.Unmarshal(data, &event)
535+}
536+
537+// computeJobTiming derives started_at, ended_at, and duration from session timestamps.
538+// started_at is the earliest session creation time, ended_at is the latest session end time.
539+// For running jobs, ended_at is empty and duration is computed against now.
540+func computeJobTiming(sessions []SessionInfo) (startedAt, endedAt, duration string) {
541+ var earliestCreated, latestEnded int64
542+ hasCreated, hasEnded := false, false
543+
544+ for _, s := range sessions {
545+ if s.Created != "" {
546+ var c int64
547+ if _, err := fmt.Sscanf(s.Created, "%d", &c); err == nil {
548+ if !hasCreated || c < earliestCreated {
549+ earliestCreated = c
550+ }
551+ hasCreated = true
552+ }
553+ }
554+ if s.Ended != "" {
555+ var e int64
556+ if _, err := fmt.Sscanf(s.Ended, "%d", &e); err == nil {
557+ if !hasEnded || e > latestEnded {
558+ latestEnded = e
559+ }
560+ hasEnded = true
561+ }
562+ }
563+ }
564+
565+ if hasCreated {
566+ startedAt = time.Unix(earliestCreated, 0).UTC().Format(time.RFC3339)
567+ }
568+ if hasEnded {
569+ endedAt = time.Unix(latestEnded, 0).UTC().Format(time.RFC3339)
570+ }
571+
572+ if hasCreated && hasEnded {
573+ duration = fmtDurationTs(earliestCreated, latestEnded)
574+ } else if hasCreated {
575+ duration = fmtDurationTs(earliestCreated, time.Now().Unix())
576+ }
577+
578+ return startedAt, endedAt, duration
579+}
580+
581+// fmtDurationTs formats the duration between two unix timestamps.
582+func fmtDurationTs(started, ended int64) string {
583+ d := time.Duration(ended-started) * time.Second
584+ if d < 0 {
585+ return "—"
586+ }
587+ if d >= time.Minute {
588+ return fmt.Sprintf("%dm%ds", d/time.Minute, d%time.Minute/time.Second)
589+ }
590+ secs := float64(d) / float64(time.Second)
591+ if secs >= 10 {
592+ return fmt.Sprintf("%ds", int(secs))
593+ }
594+ return fmt.Sprintf("%.1fs", secs)
595+}
596+
597+// filterCISessions returns only sessions with ci. prefix.
598+func filterCISessions(sessions []SessionInfo) []SessionInfo {
599+ var filtered []SessionInfo
600+ for _, s := range sessions {
601+ if strings.HasPrefix(s.Name, "ci.") {
602+ filtered = append(filtered, s)
603+ }
604+ }
605+ return filtered
606+}
607+
608+// groupSessionsByJob groups sessions by their job prefix (ci.<name>.<jobID>.).
609+func groupSessionsByJob(sessions []SessionInfo) map[string][]SessionInfo {
610+ groups := make(map[string][]SessionInfo)
611+ for _, s := range sessions {
612+ prefix := extractJobPrefix(s.Name)
613+ if prefix == "" {
614+ continue
615+ }
616+ // Set the Short name
617+ s.Short = strings.TrimPrefix(s.Name, prefix)
618+ groups[prefix] = append(groups[prefix], s)
619+ }
620+ return groups
621+}
622+
623+// parseJobPrefix extracts name and jobID from a prefix like "ci.<name>.<jobID>.".
624+func parseJobPrefix(prefix string) (name, jobID string) {
625+ // ci.name.jobID. -> ["ci", "name", "jobID", ""]
626+ parts := strings.Split(prefix, ".")
627+ if len(parts) < 4 {
628+ return "", ""
629+ }
630+ return parts[1], parts[2]
631+}
632+
633+// resolveJobExitCode determines the job's exit code from its sessions.
634+// Defensive: if any child session failed, the job failed regardless of the
635+// runner's exit code. This protects against bad pico.sh scripts that exit 0
636+// without waiting for children.
637+func resolveJobExitCode(sessions []SessionInfo) (int, string) {
638+ var runnerExit *int
639+ var worstChild *int // highest non-zero child exit code
640+
641+ for _, s := range sessions {
642+ if s.ExitCode == "" {
643+ continue
644+ }
645+ var code int
646+ if _, err := fmt.Sscanf(s.ExitCode, "%d", &code); err != nil {
647+ continue
648+ }
649+
650+ if strings.HasSuffix(s.Name, ".runner") {
651+ runnerExit = &code
652+ } else if code != 0 {
653+ if worstChild == nil || code > *worstChild {
654+ worstChild = &code
655+ }
656+ }
657+ }
658+
659+ // Any child failure overrides the runner — defensive against bad scripts
660+ if worstChild != nil {
661+ return *worstChild, "failed"
662+ }
663+ if runnerExit != nil && *runnerExit != 0 {
664+ return *runnerExit, "failed"
665+ }
666+ return 0, "success"
667+}
668+
669+func generateJobID(name, workspace string) string {
670+ return jobIDFor(name, workspace, time.Now().UnixNano())
671+}
672+
673+func jobIDFor(name, workspace string, ts int64) string {
674+ h := sha256.Sum256([]byte(name + workspace + fmt.Sprintf("%d", ts)))
675+ return fmt.Sprintf("%x", h[:4])
676+}
677+
678+func shortSessionName(session, prefix string) string {
679+ return strings.TrimPrefix(session, prefix)
680+}
681+
682+func parseZMXList(output string) []SessionInfo {
683+ var sessions []SessionInfo
684+ lines := strings.Split(strings.TrimSpace(output), "\n")
685+ for _, line := range lines {
686+ line = strings.TrimSpace(line)
687+ if line == "" {
688+ continue
689+ }
690+ // Strip leading arrow/space prefix
691+ line = strings.TrimPrefix(line, "→ ")
692+ line = strings.TrimSpace(line)
693+
694+ fields := strings.FieldsFunc(line, func(r rune) bool {
695+ return r == '\t'
696+ })
697+
698+ var si SessionInfo
699+ for _, field := range fields {
700+ parts := strings.SplitN(field, "=", 2)
701+ if len(parts) != 2 {
702+ continue
703+ }
704+ switch parts[0] {
705+ case "name":
706+ si.Name = parts[1]
707+ case "pid":
708+ si.PID = parts[1]
709+ case "clients":
710+ si.Clients = parts[1]
711+ case "created":
712+ si.Created = parts[1]
713+ case "start_dir":
714+ si.StartDir = parts[1]
715+ case "ended":
716+ si.Ended = parts[1]
717+ case "exit_code":
718+ si.ExitCode = parts[1]
719+ }
720+ }
721+ if si.Name != "" {
722+ sessions = append(sessions, si)
723+ }
724+ }
725+ return sessions
726+}
727+
728+func fetchHistoryHTML(sessionName string) (string, error) {
729+ cmd := exec.Command("zmx", "history", sessionName, "--html")
730+ output, err := cmd.Output()
731+ if err != nil {
732+ return "", err
733+ }
734+ return string(output), nil
735+}
736+
737+func fetchHistoryPlain(sessionName string) (string, error) {
738+ cmd := exec.Command("zmx", "history", sessionName, "--plain")
739+ output, err := cmd.Output()
740+ if err != nil {
741+ return "", err
742+ }
743+ return string(output), nil
744+}
745+
746+func publishStatus(w io.Writer, payload StatusPayload) error {
747+ data, err := json.Marshal(payload)
748+ if err != nil {
749+ return err
750+ }
751+ _, err = w.Write(append(data, '\n'))
752+ return err
753+}
754+
755+func stageArtifact(dir, name, jobID, short, content, ext string) error {
756+ path := filepath.Join(dir, name, jobID, short+ext)
757+ if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
758+ return err
759+ }
760+ return os.WriteFile(path, []byte(content), 0644)
761+}
762+
763+func syncArtifacts(cfg *Cfg, log *slog.Logger) error {
764+ entries, err := os.ReadDir(cfg.ArtifactDir)
765+ if err != nil {
766+ return fmt.Errorf("read artifact dir: %w", err)
767+ }
768+
769+ for _, entry := range entries {
770+ if !entry.IsDir() {
771+ continue
772+ }
773+ repoEntries, err := os.ReadDir(filepath.Join(cfg.ArtifactDir, entry.Name()))
774+ if err != nil {
775+ log.Error("read repo dir", "repo", entry.Name(), "err", err)
776+ continue
777+ }
778+ for _, repoEntry := range repoEntries {
779+ if !repoEntry.IsDir() {
780+ continue
781+ }
782+ eventPath := filepath.Join(cfg.ArtifactDir, entry.Name(), repoEntry.Name(), "event.json")
783+ data, err := os.ReadFile(eventPath)
784+ if err != nil {
785+ continue // no event yet, skip
786+ }
787+ var event Event
788+ if err := json.Unmarshal(data, &event); err != nil || event.ArtifactDest == "" {
789+ continue
790+ }
791+ log.Debug("syncing artifacts", "repo", entry.Name(), "job_id", repoEntry.Name(), "dest", event.ArtifactDest)
792+ sshArgs := fmt.Sprintf(
793+ "-i %s -o IdentitiesOnly=yes -o CertificateFile %s",
794+ cfg.KeyLocation,
795+ cfg.CertificateLocation,
796+ )
797+ srcDir := filepath.Join(cfg.ArtifactDir, entry.Name(), repoEntry.Name())
798+ cmd := exec.Command("rsync", "-e", sshArgs, "-rv", srcDir+"/", event.ArtifactDest)
799+ if err := runCmd(cmd, log); err != nil {
800+ log.Error("sync artifacts", "repo", entry.Name(), "job_id", repoEntry.Name(), "err", err)
801+ }
802+ }
803+ }
804+ return nil
805+}
806+
807+func filterSessions(sessions []SessionInfo, prefix string) []SessionInfo {
808+ var filtered []SessionInfo
809+ for _, s := range sessions {
810+ if strings.HasPrefix(s.Name, prefix) {
811+ cs := s
812+ cs.Short = shortSessionName(s.Name, prefix)
813+ filtered = append(filtered, cs)
814+ }
815+ }
816+ return filtered
817+}
818+
819+func allCompleted(sessions []SessionInfo) bool {
820+ for _, s := range sessions {
821+ if s.Ended == "" {
822+ return false
823+ }
824+ }
825+ return true
826+}
827+
828+func runCmd(cmd *exec.Cmd, log *slog.Logger) error {
829+ stdout, err := cmd.StdoutPipe()
830+ if err != nil {
831+ return err
832+ }
833+
834+ stderr, err := cmd.StderrPipe()
835+ if err != nil {
836+ return err
837+ }
838+
839+ if err := cmd.Start(); err != nil {
840+ return err
841+ }
842+
843+ go func() {
844+ scanner := bufio.NewScanner(stdout)
845+ for scanner.Scan() {
846+ log.Debug("cmd stdout", "text", scanner.Text())
847+ }
848+ }()
849+
850+ go func() {
851+ scanner := bufio.NewScanner(stderr)
852+ for scanner.Scan() {
853+ log.Error("cmd stderr", "text", scanner.Text())
854+ }
855+ }()
856+
857+ return cmd.Wait()
858+}
859+
860+// runCancel reads an event from stdin and cancels any running job with matching name+type.
861+func runCancel(cfg *Cfg) error {
862+ log := cfg.Logger.With("cmd", "cancel")
863+
864+ // Read event from stdin
865+ scanner := bufio.NewScanner(os.Stdin)
866+ if !scanner.Scan() {
867+ return fmt.Errorf("no input on stdin")
868+ }
869+
870+ var event Event
871+ if err := json.Unmarshal([]byte(scanner.Text()), &event); err != nil {
872+ return fmt.Errorf("unmarshal event: %w", err)
873+ }
874+
875+ log = log.With("repo", event.Name, "type", event.Type)
876+ log.Info("cancelling running jobs for repo")
877+
878+ cancelRunningJobs(cfg, log, event.Name)
879+ return nil
880+}
881+
882+// cancelRunningJobs finds and cancels all running jobs for a given repo name.
883+// It kills the runner sessions (which cascades to children). The monitor will
884+// detect the ended sessions and publish cancelled status on its next tick.
885+func cancelRunningJobs(cfg *Cfg, log *slog.Logger, name string) {
886+ runnerSessions, _ := findRunningJobs(name)
887+ if len(runnerSessions) == 0 {
888+ log.Debug("no running jobs to cancel")
889+ return
890+ }
891+
892+ log.Debug("found running jobs to cancel", "count", len(runnerSessions))
893+
894+ for _, runnerName := range runnerSessions {
895+ jobID := extractJobID(runnerName)
896+ log := log.With("job_id", jobID)
897+
898+ log.Debug("cancelling job", "runner", runnerName)
899+ if err := killSessions([]string{runnerName}); err != nil {
900+ log.Error("kill runner session", "err", err)
901+ continue
902+ }
903+ log.Debug("cancelled runner session")
904+ }
905+}
906+
907+// findRunningJobs finds all active runner sessions for a given name.
908+// Returns runner session names and all sessions for reference.
909+func findRunningJobs(name string) ([]string, []SessionInfo) {
910+ listOutput, err := exec.Command("zmx", "list").CombinedOutput()
911+ if err != nil {
912+ return nil, nil
913+ }
914+
915+ sessions := parseZMXList(string(listOutput))
916+ var runners []string
917+ for _, s := range sessions {
918+ // Match ci.<name>.*.runner sessions that are still active (no ended)
919+ if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") && s.Ended == "" {
920+ runners = append(runners, s.Name)
921+ }
922+ }
923+ return runners, sessions
924+}
925+
926+// extractJobID extracts the jobID from a runner session name like ci.<name>.<jobID>.runner.
927+func extractJobID(runnerName string) string {
928+ // ci.<name>.<jobID>.runner
929+ // Remove "ci." prefix and ".runner" suffix, then take the part after the first "."
930+ name := strings.TrimSuffix(runnerName, ".runner")
931+ name = strings.TrimPrefix(name, "ci.")
932+ // name is now <name>.<jobID>, take the jobID part
933+ parts := strings.SplitN(name, ".", 2)
934+ if len(parts) == 2 {
935+ return parts[1]
936+ }
937+ return ""
938+}
939+
940+// killSessions kills zmx sessions by name.
941+func killSessions(names []string) error {
942+ if len(names) == 0 {
943+ return nil
944+ }
945+ args := append([]string{"kill"}, names...)
946+ cmd := exec.Command("zmx", args...)
947+ output, err := cmd.CombinedOutput()
948+ if err != nil {
949+ return fmt.Errorf("zmx kill: %s: %w", string(output), err)
950+ }
951+ return nil
952+}
953+
954+// runGC deletes completed CI zmx sessions that are not part of a running job.
955+func runGC(cfg *Cfg) error {
956+ log := cfg.Logger.With("cmd", "gc")
957+ log.Debug("running garbage collection")
958+
959+ listOutput, err := exec.Command("zmx", "list").CombinedOutput()
960+ if err != nil {
961+ return fmt.Errorf("zmx list: %w", err)
962+ }
963+
964+ sessions := parseZMXList(string(listOutput))
965+
966+ // Group ci. sessions by job prefix: ci.<name>.<jobID>.
967+ groups := make(map[string][]SessionInfo)
968+ for _, s := range sessions {
969+ if !strings.HasPrefix(s.Name, "ci.") {
970+ continue
971+ }
972+ // Extract job prefix: ci.<name>.<jobID>.
973+ prefix := extractJobPrefix(s.Name)
974+ if prefix == "" {
975+ continue
976+ }
977+ groups[prefix] = append(groups[prefix], s)
978+ }
979+
980+ // For each group, if all sessions are completed, kill them.
981+ var toKill []string
982+ for prefix, group := range groups {
983+ allDone := true
984+ for _, s := range group {
985+ if s.Ended == "" {
986+ allDone = false
987+ break
988+ }
989+ }
990+ if allDone {
991+ log.Debug("completed job, scheduling for gc", "prefix", prefix, "sessions", len(group))
992+ toKill = append(toKill, group[0].Name)
993+ }
994+ }
995+
996+ if len(toKill) == 0 {
997+ log.Debug("no sessions to garbage collect")
998+ return nil
999+ }
1000+
1001+ if err := killSessions(toKill); err != nil {
1002+ return fmt.Errorf("kill sessions: %w", err)
1003+ }
1004+
1005+ log.Debug("garbage collection complete", "killed", len(toKill))
1006+ return nil
1007+}
1008+
1009+// extractJobPrefix extracts the job prefix from a session name.
1010+// ci.<name>.<jobID>.<step> -> ci.<name>.<jobID>.
1011+func extractJobPrefix(sessionName string) string {
1012+ // Find the third dot (after ci.<name>.<jobID>)
1013+ // ci.name.jobID.step -> ci.name.jobID.
1014+ parts := strings.Split(sessionName, ".")
1015+ if len(parts) < 4 {
1016+ return ""
1017+ }
1018+ return parts[0] + "." + parts[1] + "." + parts[2] + "."
1019+}
1020+
1021+// sessionRow holds display info for a single session in the job index.
1022+type sessionRow struct {
1023+ Name string
1024+ Short string
1025+ Status string
1026+ ExitCode string
1027+ Ended string
1028+ Duration string
1029+}
1030+
1031+// generateJobIndex produces HTML and plain-text index pages listing all
1032+// sessions for a job with links and metadata (status, exit code, ended at).
1033+func generateJobIndex(name, jobID string, sessions []SessionInfo) (htmlContent, txtContent string) {
1034+ rows := make([]sessionRow, 0, len(sessions))
1035+ for _, s := range sessions {
1036+ row := sessionRow{
1037+ Name: s.Name,
1038+ Short: s.Short,
1039+ }
1040+ if s.Ended == "" {
1041+ row.Status = "running"
1042+ row.Duration = fmtDuration(s.Created, fmt.Sprintf("%d", time.Now().Unix()))
1043+ } else if s.ExitCode == "0" {
1044+ row.Status = "success"
1045+ row.ExitCode = "0"
1046+ } else {
1047+ row.Status = "failed"
1048+ row.ExitCode = s.ExitCode
1049+ }
1050+ row.Ended = s.Ended
1051+ row.Duration = fmtDuration(s.Created, s.Ended)
1052+ rows = append(rows, row)
1053+ }
1054+
1055+ // Resolve overall job status
1056+ jobStatus := "success"
1057+ hasRunning := false
1058+ for _, r := range rows {
1059+ if r.Status == "running" {
1060+ hasRunning = true
1061+ break
1062+ }
1063+ if r.Status == "failed" {
1064+ jobStatus = "failed"
1065+ }
1066+ }
1067+ if hasRunning {
1068+ jobStatus = "running"
1069+ }
1070+
1071+ // HTML index
1072+ tmpl, err := template.New("index").Parse(indexHTMLTemplate)
1073+ if err == nil {
1074+ var buf strings.Builder
1075+ if err := tmpl.Execute(&buf, struct {
1076+ Name string
1077+ JobID string
1078+ JobStatus string
1079+ Rows []sessionRow
1080+ }{Name: name, JobID: jobID, JobStatus: jobStatus, Rows: rows}); err == nil {
1081+ htmlContent = buf.String()
1082+ }
1083+ }
1084+
1085+ // Plain-text index
1086+ var buf strings.Builder
1087+ statusIcon := map[string]string{"success": "\u2705", "failed": "\u274c", "running": "\u23f3"}
1088+ icon := statusIcon[jobStatus]
1089+ fmt.Fprintf(&buf, "Job: %s (%s) %s\n", name, jobID, icon)
1090+ fmt.Fprintf(&buf, "Sessions: %d\n", len(rows))
1091+ fmt.Fprintln(&buf, strings.Repeat("-", 70))
1092+ for _, r := range rows {
1093+ rIcon := statusIcon[r.Status]
1094+ fmt.Fprintf(&buf, " %s %-20s exit: %-5s duration: %-8s ended: %s\n",
1095+ rIcon, r.Short, r.ExitCode, r.Duration, r.Ended)
1096+ }
1097+ fmt.Fprintln(&buf, strings.Repeat("-", 60))
1098+ txtContent = buf.String()
1099+
1100+ return htmlContent, txtContent
1101+}
1102+
1103+const indexHTMLTemplate = `<!DOCTYPE html>
1104+<html lang="en">
1105+<head>
1106+<meta charset="UTF-8">
1107+<meta name="viewport" content="width=device-width, initial-scale=1.0">
1108+<title>CI Job: {{.Name}} ({{.JobID}})</title>
1109+<style>
1110+ body { font-family: system-ui, -apple-system, sans-serif; margin: 2rem; background: #fafafa; color: #222; }
1111+ h1 { font-size: 1.4rem; margin-bottom: 0.25rem; display: flex; align-items: center; gap: 0.75rem; }
1112+ .meta { color: #666; margin-bottom: 1.5rem; font-size: 0.9rem; }
1113+ table { border-collapse: collapse; width: 100%; max-width: 800px; }
1114+ th { text-align: left; padding: 0.5rem 0.75rem; border-bottom: 2px solid #ccc; font-size: 0.8rem; text-transform: uppercase; color: #555; }
1115+ td { padding: 0.5rem 0.75rem; border-bottom: 1px solid #eee; }
1116+ a { color: #0066cc; text-decoration: none; }
1117+ a:hover { text-decoration: underline; }
1118+ .pill { display: inline-block; padding: 0.2rem 0.65rem; border-radius: 999px; font-size: 0.75rem; font-weight: 700; text-transform: uppercase; letter-spacing: 0.03em; }
1119+ .pill-success { background: #d4edda; color: #155724; }
1120+ .pill-failed { background: #f8d7da; color: #721c24; }
1121+ .pill-running { background: #cce5ff; color: #004085; }
1122+</style>
1123+</head>
1124+<body>
1125+<h1>{{.Name}} <span class="pill pill-{{.JobStatus}}">{{.JobStatus}}</span></h1>
1126+<p class="meta">Job ID: {{.JobID}} · {{len .Rows}} session(s)</p>
1127+<table>
1128+<thead>
1129+<tr><th>Session</th><th>Status</th><th>Exit Code</th><th>Duration</th><th>Ended</th><th>Artifacts</th></tr>
1130+</thead>
1131+<tbody>
1132+{{range .Rows}}
1133+<tr>
1134+ <td><strong>{{.Short}}</strong></td>
1135+ <td><span class="pill pill-{{.Status}}">{{.Status}}</span></td>
1136+ <td>{{.ExitCode}}</td>
1137+ <td>{{.Duration}}</td>
1138+ <td>{{.Ended}}</td>
1139+ <td><a href="{{.Short}}.html">html</a> / <a href="{{.Short}}.txt">txt</a></td>
1140+</tr>
1141+{{end}}
1142+</tbody>
1143+</table>
1144+</body>
1145+</html>`
1146+
1147+// fmtDuration formats the duration between two unix timestamp strings.
1148+// Returns human-readable strings like "2m34s", "1.5s", or "—" if invalid.
1149+func fmtDuration(created, ended string) string {
1150+ if created == "" || ended == "" {
1151+ return "—"
1152+ }
1153+ var c, e int64
1154+ if _, err := fmt.Sscanf(created, "%d", &c); err != nil {
1155+ return "—"
1156+ }
1157+ if _, err := fmt.Sscanf(ended, "%d", &e); err != nil {
1158+ return "—"
1159+ }
1160+ duration := time.Duration(e-c) * time.Second
1161+ if duration < 0 {
1162+ return "—"
1163+ }
1164+ if duration >= time.Minute {
1165+ return fmt.Sprintf("%dm%ds", duration/time.Minute, duration%time.Minute/time.Second)
1166+ }
1167+ secs := float64(duration) / float64(time.Second)
1168+ if secs >= 10 {
1169+ return fmt.Sprintf("%ds", int(secs))
1170+ }
1171+ return fmt.Sprintf("%.1fs", secs)
1172+}