main main.go
Eric Bower  ·  2026-05-11
   1package main
   2
   3import (
   4	"archive/tar"
   5	"bufio"
   6	"context"
   7	"crypto/sha256"
   8	"embed"
   9	"encoding/json"
  10	"flag"
  11	"fmt"
  12	"html/template"
  13	"io"
  14	"io/fs"
  15	"log/slog"
  16	"os"
  17	"os/exec"
  18	"os/signal"
  19	"path/filepath"
  20	"runtime"
  21	"strconv"
  22	"strings"
  23	"syscall"
  24	"time"
  25)
  26
  27//go:embed tmpl/*
  28var tmplFS embed.FS
  29
  30type WorkspaceFactory func(cfg *Cfg, logger *slog.Logger, source string) Workspace
  31
  32func defaultWorkspaceFactory(cfg *Cfg, logger *slog.Logger, source string) Workspace {
  33	if strings.HasSuffix(source, ".tar") {
  34		return &WorkspaceTar{
  35			Cfg:    cfg,
  36			Logger: logger,
  37			Source: source,
  38		}
  39	}
  40	return &WorkspaceRsync{
  41		Cfg:    cfg,
  42		Logger: logger,
  43		Source: source,
  44	}
  45}
  46
  47type Cfg struct {
  48	Logger              *slog.Logger
  49	Ctx                 context.Context
  50	Cancel              context.CancelFunc
  51	KeyLocation         string
  52	CertificateLocation string
  53	ArtifactDir         string
  54	Event               string        // event JSON passed via --event flag
  55	EventSource         io.ReadCloser // when set, used directly as the event source (for testing)
  56	MonitorInterval     time.Duration
  57	GCInterval          time.Duration
  58	NewWorkspace        WorkspaceFactory
  59	StatusOutput        io.Writer // where status JSONL is written (default: os.Stdout)
  60	IncludeRunning      bool      // emit running status updates in addition to terminal
  61	HumanOutput         bool      // human-readable output instead of JSONL / slog
  62	Wait                bool      // block until job completes, print history and summary
  63}
  64
  65type Event struct {
  66	Type         string `json:"type"`
  67	Name         string `json:"name"`
  68	Workspace    string `json:"workspace"`
  69	Branch       string `json:"branch"`
  70	Commit       string `json:"commit"`
  71	ArtifactDest string `json:"artifact_dest"`
  72	ForcePush    bool   `json:"force_push,omitempty"`
  73}
  74
  75func NewCfg() (*Cfg, string, bool) {
  76	var keyLoc, certLoc, artifactDir, event string
  77	var monitorInterval time.Duration
  78	var gcInterval time.Duration
  79	var logLevel string
  80	flag.StringVar(&keyLoc, "pk", "", "ssh private key used to authenticate with pico services")
  81	flag.StringVar(&certLoc, "ck", "", "ssh certificate public key used to authenticate with pico services (only required if using ssh certificates)")
  82	flag.StringVar(&artifactDir, "artifact-dir", "/tmp/pici-artifacts", "local directory to stage artifacts")
  83	flag.StringVar(&event, "event", "", "event JSON to run (alternative to reading from stdin)")
  84	flag.DurationVar(&monitorInterval, "monitor-interval", 5*time.Second, "interval for monitoring zmx sessions")
  85	flag.DurationVar(&gcInterval, "gc-interval", 10*time.Minute, "interval for garbage collection in monitor (0 to disable)")
  86	flag.StringVar(&logLevel, "log-level", "info", "log level: debug, info, warn, error")
  87	var includeRunning bool
  88	var human bool
  89	var wait bool
  90	flag.BoolVar(&includeRunning, "include-running", false, "emit running status updates in addition to terminal (default: terminal only)")
  91	flag.BoolVar(&human, "human", false, "human-readable output (default: JSONL / slog)")
  92	flag.BoolVar(&wait, "wait", false, "block until job completes, printing session history and summary")
  93
  94	// Split args so the subcommand (first non-flag arg) doesn't block
  95	// flags that appear after it: "pici runner --wait" works.
  96	flags, cmd, wantHelp := splitCommand(os.Args[1:])
  97	if err := flag.CommandLine.Parse(flags); err != nil {
  98		fmt.Fprintf(os.Stderr, "failed to parse flags: %v\n", err)
  99		os.Exit(1)
 100	}
 101
 102	logger := newLogger("ci", logLevel)
 103	ctx, cancel := context.WithCancel(context.Background())
 104	return &Cfg{
 105		NewWorkspace:        defaultWorkspaceFactory,
 106		Logger:              logger.With("key_loc", keyLoc, "cert_loc", certLoc),
 107		Ctx:                 ctx,
 108		Cancel:              cancel,
 109		KeyLocation:         keyLoc,
 110		CertificateLocation: certLoc,
 111		ArtifactDir:         artifactDir,
 112		Event:               event,
 113		MonitorInterval:     monitorInterval,
 114		GCInterval:          gcInterval,
 115		IncludeRunning:      includeRunning,
 116		HumanOutput:         human,
 117		Wait:                wait,
 118	}, cmd, wantHelp
 119}
 120
 121// splitCommand separates the first non-flag argument (the subcommand) from
 122// the rest of the flags, so "runner --wait" becomes flags=["--wait"], cmd="runner".
 123// It strips --help/help so we can print custom help per subcommand.
 124func splitCommand(args []string) (flags []string, cmd string, wantHelp bool) {
 125	flags = make([]string, 0, len(args))
 126	for _, arg := range args {
 127		if arg == "--help" || arg == "help" {
 128			wantHelp = true
 129			continue
 130		}
 131		if cmd == "" && arg != "" && !strings.HasPrefix(arg, "-") {
 132			cmd = arg
 133		} else {
 134			flags = append(flags, arg)
 135		}
 136	}
 137	return flags, cmd, wantHelp
 138}
 139
 140func parseLogLevel(s string) slog.Level {
 141	switch strings.ToLower(s) {
 142	case "debug":
 143		return slog.LevelDebug
 144	case "warn":
 145		return slog.LevelWarn
 146	case "error":
 147		return slog.LevelError
 148	default:
 149		return slog.LevelInfo
 150	}
 151}
 152
 153func newLogger(space string, levelStr string) *slog.Logger {
 154	lvl := parseLogLevel(levelStr)
 155	return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
 156		Level: lvl,
 157	})).With("service", space)
 158}
 159
 160func main() {
 161	cfg, cmd, wantHelp := NewCfg()
 162
 163	cfg.Logger.Debug("setting up ci", "cfg", cfg)
 164	cfg.Logger.Debug("running cmd", "cmd", cmd)
 165
 166	if wantHelp && (cmd == "runner" || cmd == "monitor") {
 167		if cmd == "runner" {
 168			printRunnerHelp()
 169		} else {
 170			printMonitorHelp()
 171		}
 172		return
 173	}
 174
 175	switch cmd {
 176	case "runner":
 177		cfg.Logger.Debug("starting runner")
 178		if err := RunRunner(cfg); err != nil {
 179			cfg.Logger.Error("runner failed", "err", err)
 180			os.Exit(1)
 181		}
 182	case "cancel":
 183		cfg.Logger.Debug("starting cancel handler")
 184		if err := runCancel(cfg); err != nil {
 185			cfg.Logger.Error("cancel failed", "err", err)
 186			os.Exit(1)
 187		}
 188	case "gc":
 189		cfg.Logger.Debug("starting garbage collection")
 190		if err := runGC(cfg); err != nil {
 191			cfg.Logger.Error("gc failed", "err", err)
 192			os.Exit(1)
 193		}
 194	case "monitor":
 195		cfg.Logger.Debug("starting monitor")
 196		if err := runMonitor(cfg); err != nil {
 197			cfg.Logger.Error("monitor failed", "err", err)
 198			os.Exit(1)
 199		}
 200	case "status":
 201		cfg.Logger.Debug("starting status updater")
 202	case "orca":
 203		cfg.Logger.Debug("starting orchestrator")
 204	default:
 205		cfg.Logger.Error("must provide command: runner, cancel, gc, monitor, status, or orca")
 206		os.Exit(1)
 207	}
 208}
 209
 210func printMonitorHelp() {
 211	fmt.Println(`pici monitor — poll ci.* zmx sessions, stage artifacts, publish status.
 212
 213USAGE
 214  pici monitor [flags]                        # JSONL to stdout (pipe to webhooks)
 215  pici monitor --human                        # human-readable terminal output
 216
 217OUTPUT MODES
 218  Default (JSONL):  One JSON object per line, suitable for piping to notifications.
 219    pici monitor > status.jsonl
 220    pici monitor | ssh pipe.pico.sh "pub build.status -b=false"
 221    pici monitor | while read -r line; do curl -sd"$line" $WEBHOOK; done
 222
 223  --human:          Selfci-style progress output for terminal viewing (do not pipe).
 224    pici monitor --human
 225    pici monitor --human --include-running
 226    [2/3] 🚀 running: myrepo (1m23s)
 227    [3/3] ✅ success: myrepo (2m34s)
 228
 229FLAGS
 230  -pk <path>               SSH private key for authenticating with pico services
 231  -ck <path>               SSH certificate public key
 232  -artifact-dir <path>     Local directory to stage artifacts (default: /tmp/pici-artifacts)
 233  -monitor-interval <dur>  Poll interval (default: 5s)
 234  -gc-interval <dur>       Garbage collection interval (default: 10m, 0 to disable)
 235  -include-running         Emit running status updates in addition to terminal (default: terminal only)
 236  -human                   Human-readable output instead of JSONL (for terminal, not piping)
 237  -log-level <level>       Log level: debug, info, warn, error (default: info)`)
 238}
 239
 240func printRunnerHelp() {
 241	fmt.Println(`pici runner — execute a CI job from an event JSON payload.
 242
 243USAGE
 244  echo '<event-json>' | pici runner [flags]
 245  pici runner --event '<event-json>' [flags]
 246  echo '<event-json>' | pici runner --wait [flags]  # block until done
 247
 248EVENT JSON FIELDS
 249  type          (required) Event type, e.g. "push", "merge_request"
 250  name          (required) Repository name, used for session naming
 251  workspace     (required) SSH source path to rsync, e.g. "git@github.com:user/repo.git"
 252  artifact_dest  (optional) SSH destination for artifact sync, e.g. "user@host:/path"
 253
 254
 255EXAMPLE
 256  echo '{"type":"push","name":"myrepo","workspace":"git@github.com:user/myrepo.git"}' | pici runner
 257
 258ENV VARS IN pico.sh
 259  The runner exports these environment variables for your pico.sh script:
 260    PICO_CI_JOB_ID      Unique job identifier (e.g. "a3f2b8c1")
 261    PICO_CI_REPO        Repository name (from event "name" field)
 262    PICO_CI_EVENT_TYPE  Event type (from event "type" field)
 263
 264  Note: pico.sh runs with the workspace as its working directory, so
 265  $(pwd) gives you the workspace path directly.
 266
 267  Use them with defaults so pico.sh works standalone:
 268    JOB_ID="${PICO_CI_JOB_ID:-local}"
 269    REPO="${PICO_CI_REPO:-unknown}"
 270
 271FLAGS
 272  -pk <path>       SSH private key for authenticating with pico services
 273  -ck <path>       SSH certificate public key (required when using SSH certificates)
 274  -event <json>    Event JSON string (alternative to reading from stdin)
 275  -artifact-dir <path>  Local directory to stage artifacts (default: /tmp/pici-artifacts)
 276  -log-level <level>   Log level: debug, info, warn, error (default: info)
 277  -human           Human-readable output (default: enabled for runner)
 278  -wait            Block until job completes, printing session history and summary`)
 279
 280}
 281
 282func RunRunner(cfg *Cfg) error {
 283	var payload string
 284	if cfg.EventSource != nil {
 285		data, err := io.ReadAll(cfg.EventSource)
 286		if err != nil {
 287			return fmt.Errorf("read event source: %w", err)
 288		}
 289		payload = strings.TrimSpace(string(data))
 290	} else if cfg.Event != "" {
 291		payload = cfg.Event
 292	} else {
 293		data, err := io.ReadAll(os.Stdin)
 294		if err != nil {
 295			return fmt.Errorf("read stdin: %w", err)
 296		}
 297		payload = strings.TrimSpace(string(data))
 298	}
 299
 300	var eventData Event
 301	if err := json.Unmarshal([]byte(payload), &eventData); err != nil {
 302		return fmt.Errorf("unmarshal event: %w", err)
 303	}
 304
 305	// Validate required fields
 306	if eventData.Type == "" {
 307		return fmt.Errorf("event missing required field: type")
 308	}
 309	if eventData.Name == "" {
 310		return fmt.Errorf("event missing required field: name")
 311	}
 312	if eventData.Workspace == "" {
 313		return fmt.Errorf("event missing required field: workspace")
 314	}
 315
 316	return eventHandler(cfg, &eventData)
 317}
 318
 319type Workspace interface {
 320	Setup() error
 321	Cleanup() error
 322	GetDir() string
 323	// Checksum returns a content hash of the workspace (e.g. sha256 of the tarball).
 324	// Returns empty string if not available.
 325	Checksum() string
 326}
 327
 328type WorkspaceRsync struct {
 329	Cfg    *Cfg
 330	Logger *slog.Logger
 331	Source string
 332	Dest   string
 333}
 334
 335func (w *WorkspaceRsync) Setup() error {
 336	tempDir, err := os.MkdirTemp("", "pici-*")
 337	if err != nil {
 338		return err
 339	}
 340	w.Dest = tempDir
 341
 342	log := w.Logger.With("source", w.Source, "dest", w.Dest)
 343	log.Debug("syncing workspace via rsync")
 344
 345	var cmd *exec.Cmd
 346	if w.Cfg.KeyLocation != "" {
 347		sshcmd := fmt.Sprintf(
 348			"-F ~/.ssh/config -i %s -o IdentitiesOnly=yes -o CertificateFile %s",
 349			w.Cfg.KeyLocation,
 350			w.Cfg.CertificateLocation,
 351		)
 352		cmd = exec.Command("rsync", "-e", sshcmd, "-rv", `--exclude="/.git"`, w.Source+"/", w.Dest+"/")
 353	} else {
 354		cmd = exec.Command("rsync", "-rv", `--exclude="/.git"`, w.Source+"/", w.Dest+"/")
 355	}
 356	return runCmd(cmd, log)
 357}
 358
 359func (w *WorkspaceRsync) Cleanup() error {
 360	// return os.RemoveAll(w.Dest)
 361	return nil
 362}
 363
 364func (w *WorkspaceRsync) GetDir() string {
 365	return w.Dest
 366}
 367
 368func (w *WorkspaceRsync) Checksum() string {
 369	return ""
 370}
 371
 372type WorkspaceTar struct {
 373	Cfg      *Cfg
 374	Logger   *slog.Logger
 375	Source   string // e.g. "pgs.sh:/private-ci/workspaces/repo_abc123.tar"
 376	Dest     string
 377	checksum string
 378}
 379
 380func (w *WorkspaceTar) Setup() error {
 381	tempDir, err := os.MkdirTemp("", "pici-*")
 382	if err != nil {
 383		return err
 384	}
 385	w.Dest = tempDir
 386
 387	log := w.Logger.With("source", w.Source, "dest", w.Dest)
 388	log.Debug("downloading and extracting workspace tar")
 389
 390	// Parse host:path from source
 391	host, path := splitSSHSource(w.Source)
 392
 393	// Use rsync to download the tar file
 394	tarPath := filepath.Join(tempDir, "workspace.tar")
 395	rsyncCmd := exec.Command("rsync", "-e", "ssh", host+":"+path, tarPath)
 396	rsyncCmd.Stderr = os.Stderr
 397	if err := rsyncCmd.Run(); err != nil {
 398		return fmt.Errorf("rsync download: %w", err)
 399	}
 400
 401	// Compute sha256 checksum of the tarball
 402	tarData, err := os.ReadFile(tarPath)
 403	if err != nil {
 404		return fmt.Errorf("read tar for checksum: %w", err)
 405	}
 406	w.checksum = fmt.Sprintf("sha256:%x", sha256.Sum256(tarData))
 407	log.Debug("workspace checksum", "checksum", w.checksum)
 408
 409	// Open the tar file for extraction
 410	f, err := os.Open(tarPath)
 411	if err != nil {
 412		return fmt.Errorf("open tar: %w", err)
 413	}
 414	defer func() {
 415		if err := f.Close(); err != nil {
 416			log.Error("close tar", "err", err)
 417		}
 418	}()
 419
 420	// Extract tar to temp dir
 421	tr := tar.NewReader(f)
 422	for {
 423		hdr, err := tr.Next()
 424		if err == io.EOF {
 425			break
 426		}
 427		if err != nil {
 428			return fmt.Errorf("tar read: %w", err)
 429		}
 430
 431		target := filepath.Join(tempDir, hdr.Name)
 432		switch hdr.Typeflag {
 433		case tar.TypeDir:
 434			if err := os.MkdirAll(target, 0755); err != nil {
 435				return fmt.Errorf("mkdir %s: %w", target, err)
 436			}
 437		case tar.TypeReg:
 438			// Ensure parent dir exists
 439			if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil {
 440				return fmt.Errorf("mkdir parent %s: %w", filepath.Dir(target), err)
 441			}
 442			tf, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY, os.FileMode(hdr.Mode))
 443			if err != nil {
 444				return fmt.Errorf("open %s: %w", target, err)
 445			}
 446			if _, err := io.Copy(tf, tr); err != nil {
 447				if closeErr := tf.Close(); closeErr != nil {
 448					return fmt.Errorf("close %s: %w", target, closeErr)
 449				}
 450				return fmt.Errorf("write %s: %w", target, err)
 451			}
 452			if err := tf.Close(); err != nil {
 453				return fmt.Errorf("close %s: %w", target, err)
 454			}
 455		}
 456	}
 457
 458	log.Debug("workspace extracted")
 459	return nil
 460}
 461
 462func (w *WorkspaceTar) Checksum() string {
 463	return w.checksum
 464}
 465
 466func (w *WorkspaceTar) Cleanup() error {
 467	return nil
 468}
 469
 470func (w *WorkspaceTar) GetDir() string {
 471	return w.Dest
 472}
 473
 474// splitSSHSource splits "host:path" into host and path.
 475func splitSSHSource(source string) (string, string) {
 476	idx := strings.Index(source, ":")
 477	if idx == -1 {
 478		return source, ""
 479	}
 480	return source[:idx], source[idx+1:]
 481}
 482
 483type JobEngine struct {
 484	Wk     Workspace
 485	Logger *slog.Logger
 486	Cfg    *Cfg
 487	Ev     *Event
 488	JobID  string
 489}
 490
 491type SessionInfo struct {
 492	Name     string `json:"name"`
 493	Short    string `json:"short"`
 494	PID      string `json:"pid"`
 495	Clients  string `json:"clients"`
 496	Created  string `json:"created"`
 497	StartDir string `json:"start_dir"`
 498	Ended    string `json:"ended"`
 499	ExitCode string `json:"exit_code"`
 500}
 501
 502type StatusPayload struct {
 503	Timestamp    string        `json:"timestamp"`
 504	Name         string        `json:"name"`
 505	JobID        string        `json:"job_id"`
 506	Status       string        `json:"status"`
 507	ExitCode     *int          `json:"exit_code"`
 508	Duration     string        `json:"duration,omitempty"`
 509	StartedAt    string        `json:"started_at,omitempty"`
 510	EndedAt      string        `json:"ended_at,omitempty"`
 511	SessionCount int           `json:"session_count"`
 512	Sessions     []SessionInfo `json:"sessions"`
 513}
 514
 515func (eng *JobEngine) Setup() error {
 516	return eng.Wk.Setup()
 517}
 518
 519func (eng *JobEngine) Run(manifest string) error {
 520	prefix := fmt.Sprintf("ci.%s.%s.", eng.Ev.Name, eng.JobID)
 521	// Child sessions use ".step." sub-prefix so zmx wait "*" inside pico.sh
 522	// matches ci.<name>.<jobID>.step.* but NOT ci.<name>.<jobID>.runner.
 523	// This avoids a deadlock where the runner waits for itself.
 524	childPrefix := prefix + "step."
 525
 526	log := eng.Logger.With("manifest", manifest, "prefix", prefix)
 527	log.Debug("starting runner session", "session", prefix+"runner")
 528
 529	runnerName := prefix + "runner"
 530	// Name the runner explicitly. Do NOT set ZMX_SESSION_PREFIX for this
 531	// outer zmx run call — it would be prepended to the runner's own name.
 532	// The prefix is only for child sessions spawned inside pico.sh (via the
 533	// exported env var).
 534	cmdEnv := make([]string, 0, len(os.Environ())+5)
 535	for _, e := range os.Environ() {
 536		if !strings.HasPrefix(e, "ZMX_SESSION_PREFIX=") {
 537			cmdEnv = append(cmdEnv, e)
 538		}
 539	}
 540	cmdEnv = append(cmdEnv,
 541		fmt.Sprintf("PICO_CI_JOB_ID=%s", eng.JobID),
 542		fmt.Sprintf("PICO_CI_REPO=%s", eng.Ev.Name),
 543		fmt.Sprintf("PICO_CI_EVENT_TYPE=%s", eng.Ev.Type),
 544	)
 545
 546	zmxPrefixStr := fmt.Sprintf("ZMX_SESSION_PREFIX=%s", childPrefix)
 547	cmd := exec.Command("zmx", "run", runnerName, "-d", zmxPrefixStr, "bash", manifest)
 548	cmd.Env = cmdEnv
 549	cmd.Dir = eng.Wk.GetDir()
 550
 551	if err := cmd.Run(); err != nil {
 552		return fmt.Errorf("start runner session: %w", err)
 553	}
 554
 555	return nil
 556}
 557
 558func (eng *JobEngine) Cleanup() error {
 559	return eng.Wk.Cleanup()
 560}
 561
 562func (eng *JobEngine) FindManifest() (string, error) {
 563	fnames := []string{"pico.sh"}
 564	for _, manifest := range fnames {
 565		path := filepath.Join(eng.Wk.GetDir(), manifest)
 566		if _, err := os.Stat(path); err == nil {
 567			return path, nil
 568		}
 569	}
 570	return "", fmt.Errorf("no pico.sh found in %s", eng.Wk.GetDir())
 571}
 572
 573func eventHandler(cfg *Cfg, eventData *Event) error {
 574	log := cfg.Logger.With("repo", eventData.Name, "type", eventData.Type)
 575
 576	// Cancel any existing job for this repo before starting a new one
 577	cancelRunningJobs(cfg, log, eventData.Name)
 578
 579	jobID := generateJobID(eventData.Name, eventData.Workspace)
 580	log = log.With("job_id", jobID)
 581	eventBytes, _ := json.Marshal(eventData)
 582	fmt.Fprintf(os.Stdout, "🚀 starting job ci.%s.%s\n", eventData.Name, jobID)                                              //nolint:errcheck
 583	fmt.Fprintf(os.Stdout, "   event: type=%s name=%s workspace=%s\n", eventData.Type, eventData.Name, eventData.Workspace) //nolint:errcheck
 584	fmt.Fprintf(os.Stdout, "   %s\n", string(eventBytes))                                                                   //nolint:errcheck
 585
 586	wk := cfg.NewWorkspace(cfg, log, eventData.Workspace)
 587	eng := &JobEngine{
 588		Logger: log,
 589		Cfg:    cfg,
 590		Wk:     wk,
 591		Ev:     eventData,
 592		JobID:  jobID,
 593	}
 594	defer func() {
 595		if err := eng.Cleanup(); err != nil {
 596			cfg.Logger.Error("engine cleanup", "err", err)
 597		}
 598	}()
 599
 600	fmt.Fprintf(os.Stdout, "📦 syncing workspace %s\n", eventData.Workspace) //nolint:errcheck
 601	if err := eng.Setup(); err != nil {
 602		return fmt.Errorf("setup: %w", err)
 603	}
 604	fmt.Fprintf(os.Stdout, "✅ workspace ready %s\n", eng.Wk.GetDir()) //nolint:errcheck
 605
 606	// Store the event in the artifact directory so the monitor can access it
 607	eventDir := filepath.Join(cfg.ArtifactDir, eventData.Name, jobID)
 608	artifactsDir := filepath.Join(eventDir, "artifacts")
 609	if err := os.MkdirAll(artifactsDir, 0755); err != nil {
 610		log.Error("create artifacts dir", "err", err)
 611	} else {
 612		if err := os.WriteFile(filepath.Join(artifactsDir, "event.json"), eventBytes, 0644); err != nil {
 613			log.Error("write event", "err", err)
 614		}
 615	}
 616
 617	// Write attestation.json with runner/workspace provenance
 618	hostname, _ := os.Hostname()
 619	attestation := map[string]interface{}{
 620		"runner": map[string]string{
 621			"hostname": hostname,
 622			"os":       runtimeOS(),
 623			"arch":     runtimeArch(),
 624		},
 625		"provenance": map[string]string{
 626			"repo":   eventData.Name,
 627			"branch": eventData.Branch,
 628			"commit": eventData.Commit,
 629		},
 630		"workspace_checksum": eng.Wk.Checksum(),
 631	}
 632	attestationBytes, _ := json.Marshal(attestation)
 633	if err := os.WriteFile(filepath.Join(artifactsDir, "attestation.json"), attestationBytes, 0644); err != nil {
 634		log.Error("write attestation", "err", err)
 635	}
 636
 637	manifest, err := eng.FindManifest()
 638	if err != nil {
 639		fmt.Fprintf(os.Stdout, "❌ %s\n\n", err) //nolint:errcheck
 640		//nolint:errcheck
 641		fmt.Fprint(os.Stdout, `Create a pico.sh script in your workspace root:
 642
 643  #!/usr/bin/env bash
 644  set -euo pipefail
 645
 646  export ZMX_SESSION_PREFIX="${ZMX_SESSION_PREFIX:-ci.}"
 647
 648  # Run your CI steps as zmx sessions
 649  zmx run lint -d <your lint command>
 650  zmx run test -d <your test command>
 651
 652  # Wait for all steps to complete
 653  zmx wait "*"
 654
 655  printf "\x1b[32msuccess!\x1b[0m\n"
 656
 657Each 'zmx run' spawns a parallel session. 'zmx wait "*"' blocks
 658until all child sessions finish.
 659
 660You can also run this script in isolation without the runner.
 661
 662See: https://github.com/picosh/pici
 663
 664`)
 665		return err
 666	}
 667	fmt.Fprintf(os.Stdout, "🔍 found %s\n", manifest) //nolint:errcheck
 668
 669	fmt.Fprint(os.Stdout, "🏃 launching sessions...\n") //nolint:errcheck
 670	if err := eng.Run(manifest); err != nil {
 671		return fmt.Errorf("run: %w", err)
 672	}
 673
 674	fmt.Fprintln(os.Stdout, "✅ job launched") //nolint:errcheck
 675
 676	if cfg.Wait {
 677		if err := waitAndReport(cfg, log, eventData.Name, jobID); err != nil {
 678			return fmt.Errorf("wait: %w", err)
 679		}
 680		return nil
 681	}
 682
 683	session := fmt.Sprintf("ci.%s.%s.runner", eventData.Name, jobID)
 684	fmt.Fprintf(os.Stdout, "   zmx tail %s\n", session)    //nolint:errcheck
 685	fmt.Fprintf(os.Stdout, "   zmx history %s\n", session) //nolint:errcheck
 686	fmt.Fprintf(os.Stdout, "   zmx attach %s\n", session)  //nolint:errcheck
 687	return nil
 688}
 689
 690// waitAndReport polls the job's sessions until all complete, prints live
 691// progress to stdout, then dumps session history and a final summary.
 692func waitAndReport(cfg *Cfg, log *slog.Logger, name, jobID string) error {
 693	prefix := "ci." + name + "." + jobID + "."
 694	ticker := time.NewTicker(cfg.MonitorInterval)
 695	defer ticker.Stop()
 696
 697	// Handle ^C gracefully
 698	sigCh := make(chan os.Signal, 1)
 699	signal.Notify(sigCh, syscall.SIGINT)
 700	defer signal.Stop(sigCh)
 701
 702	fmt.Fprintln(os.Stdout)                                //nolint:errcheck
 703	fmt.Fprint(os.Stdout, "⏳ waiting for completion...\n") //nolint:errcheck
 704
 705	// Track state for each session
 706	known := make(map[string]*sessionState)
 707	var jobSessions []SessionInfo
 708	var sessionOrder []string // track insertion order for deterministic output
 709	var liveLines []string    // last set of status lines printed (for overwrite)
 710
 711	for {
 712		select {
 713		case <-cfg.Ctx.Done():
 714			return cfg.Ctx.Err()
 715		case <-sigCh:
 716			fmt.Fprintln(os.Stdout, "\n⏹ cancelled") //nolint:errcheck
 717			return nil
 718		case <-ticker.C:
 719		}
 720
 721		// Fetch current session list
 722		listOutput, err := exec.Command("zmx", "list").CombinedOutput()
 723		if err != nil {
 724			log.Error("zmx list", "err", err)
 725			continue
 726		}
 727
 728		sessions := parseZMXList(string(listOutput))
 729		jobSessions = nil
 730		for _, s := range sessions {
 731			if strings.HasPrefix(s.Name, prefix) {
 732				s.Short = cleanSessionShort(s.Name, prefix, name, jobID)
 733				jobSessions = append(jobSessions, s)
 734			}
 735		}
 736
 737		if len(jobSessions) == 0 {
 738			continue // runner session may not have appeared yet
 739		}
 740
 741		// Update state for each session
 742		for _, s := range jobSessions {
 743			state, ok := known[s.Short]
 744			if !ok {
 745				state = &sessionState{}
 746				known[s.Short] = state
 747				sessionOrder = append(sessionOrder, s.Short)
 748			}
 749
 750			if s.Ended == "" {
 751				state.status = "running"
 752				created, _ := strconv.ParseInt(s.Created, 10, 64)
 753				state.duration = fmtDurationTs(created, time.Now().Unix())
 754			} else {
 755				if s.ExitCode == "0" {
 756					state.status = "success"
 757				} else {
 758					state.status = "failed"
 759					state.exitCode = s.ExitCode
 760				}
 761				state.duration = fmtDuration(s.Created, s.Ended)
 762				state.printed = true // lock final state
 763			}
 764		}
 765
 766		// Build current status lines
 767		lines := make([]string, 0, len(sessionOrder))
 768		for _, short := range sessionOrder {
 769			state := known[short]
 770			icon := map[string]string{"running": "🚀", "success": "✅", "failed": "❌"}[state.status]
 771			detail := ""
 772			if state.status == "failed" && state.exitCode != "" {
 773				detail = fmt.Sprintf(", exit %s", state.exitCode)
 774			}
 775			if state.duration != "" && state.duration != "—" {
 776				detail += fmt.Sprintf(" (%s)", state.duration)
 777			}
 778			lines = append(lines, fmt.Sprintf("   %-12s %s %s%s", short, icon, state.status, detail))
 779		}
 780
 781		// Overwrite previous lines with cursor-up, or print fresh
 782		if len(liveLines) > 0 {
 783			// Move cursor up to overwrite previous lines //nolint:errcheck
 784			for range len(liveLines) {
 785				fmt.Fprint(os.Stdout, "\033[A") //nolint:errcheck
 786			}
 787			// Clear each line
 788			for i, line := range lines { //nolint:errcheck
 789				if i > 0 {
 790					fmt.Fprint(os.Stdout, "\n") //nolint:errcheck
 791				}
 792				fmt.Fprint(os.Stdout, line+"\033[K") //nolint:errcheck
 793			}
 794			fmt.Fprint(os.Stdout, "\n") //nolint:errcheck
 795		} else { //nolint:errcheck
 796			for _, line := range lines {
 797				fmt.Fprintln(os.Stdout, line) //nolint:errcheck
 798			}
 799		}
 800		liveLines = lines
 801
 802		// Check if all sessions are done
 803		if allCompleted(jobSessions) {
 804			break
 805		}
 806	}
 807
 808	// Print last 25 lines of history for failed sessions only
 809	fmt.Fprintln(os.Stdout) //nolint:errcheck
 810	for _, s := range jobSessions {
 811		state := known[s.Short]
 812		if state == nil || state.status != "failed" {
 813			continue
 814		}
 815
 816		separator := strings.Repeat("\u2500", 50)
 817		fmt.Fprintln(os.Stdout, separator)                                         //nolint:errcheck
 818		fmt.Fprintf(os.Stdout, "Session: %s (exit %s)\n", s.Short, state.exitCode) //nolint:errcheck
 819		fmt.Fprintln(os.Stdout, separator)                                         //nolint:errcheck
 820		fmt.Fprintln(os.Stdout)                                                    //nolint:errcheck
 821
 822		history, err := fetchHistoryPlain(s.Name)
 823		if err != nil {
 824			fmt.Fprintf(os.Stdout, "   (history unavailable: %v)\n", err) //nolint:errcheck
 825		} else {
 826			lines := strings.Split(history, "\n")
 827			// Show last 25 lines
 828			if len(lines) > 25 {
 829				fmt.Fprintf(os.Stdout, "   ... (%d lines omitted)\n", len(lines)-25) //nolint:errcheck
 830				lines = lines[len(lines)-25:]
 831			}
 832			for _, line := range lines {
 833				if line != "" {
 834					fmt.Fprintf(os.Stdout, "   %s\n", line) //nolint:errcheck
 835				}
 836			}
 837		}
 838		fmt.Fprintln(os.Stdout) //nolint:errcheck
 839	}
 840
 841	// Final summary
 842	exitCode, status := resolveJobExitCode(jobSessions)
 843	_, _, duration := computeJobTiming(jobSessions)
 844	icon := map[string]string{"success": "✅", "failed": "❌"}[status]
 845	if exitCode == 0 {
 846		fmt.Fprintf(os.Stdout, "%s job finished: %s (%s)\n", icon, status, duration) //nolint:errcheck
 847	} else {
 848		fmt.Fprintf(os.Stdout, "%s job failed: exit %d (%s)\n", icon, exitCode, duration) //nolint:errcheck
 849	}
 850
 851	return nil
 852}
 853
 854// cleanSessionShort produces a readable short name from a full session name.
 855// ci.name.jobID.step.lint → lint
 856// ci.name.jobID.step.ci.name.jobID.runner → runner
 857func cleanSessionShort(name, prefix, repoName, jobID string) string {
 858	short := strings.TrimPrefix(name, prefix)
 859	// Strip "step." prefix added by child sessions
 860	short = strings.TrimPrefix(short, "step.")
 861	// Strip nested full prefix (e.g. runner named ci.name.jobID.runner)
 862	nested := "ci." + repoName + "." + jobID + "."
 863	short = strings.TrimPrefix(short, nested)
 864	return short
 865}
 866
 867// runMonitor is a long-lived daemon that polls all ci.* zmx sessions,
 868// writes status as JSONL to stdout, stages artifacts, and syncs to destination.
 869// Logs go to stderr. Compose with shell tools to route status:
 870//
 871//	pici monitor | ssh pipe.pico.sh "pub build.status -b=false"
 872//	pici monitor | while read -r line; do curl -sd"$line" $WEBHOOK; done
 873//	pici monitor > status.jsonl
 874//
 875// sessionState tracks the display state of a single session.
 876type sessionState struct {
 877	status   string // "running", "success", "failed"
 878	exitCode string
 879	duration string
 880	printed  bool // true once final state (success/failed) is printed
 881}
 882
 883// monitorJobState tracks display state for a single job across ticks.
 884type monitorJobState struct {
 885	sessionOrder []string // insertion order for deterministic output
 886	sessions     map[string]*sessionState
 887	liveLines    []string // last set of status lines printed (for overwrite)
 888}
 889
 890func runMonitor(cfg *Cfg) error {
 891	log := cfg.Logger.With("cmd", "monitor")
 892
 893	output := cfg.StatusOutput
 894	if output == nil {
 895		output = os.Stdout
 896	}
 897
 898	ticker := time.NewTicker(cfg.MonitorInterval)
 899	defer ticker.Stop()
 900
 901	// Optional GC ticker — runs garbage collection on a separate interval.
 902	var gcTicker *time.Ticker
 903	if cfg.GCInterval > 0 {
 904		gcTicker = time.NewTicker(cfg.GCInterval)
 905		defer gcTicker.Stop()
 906	}
 907
 908	// Track per-job display state across ticks (for human output)
 909	jobStates := make(map[string]*monitorJobState) // key: "name/jobID"
 910
 911	log.Debug("monitor started", "interval", cfg.MonitorInterval, "gc_interval", cfg.GCInterval, "artifact_dir", cfg.ArtifactDir)
 912	log.Debug("monitoring ci.* sessions for job status, writing status to stdout")
 913
 914	for {
 915		select {
 916		case <-cfg.Ctx.Done():
 917			log.Debug("context cancelled, stopping monitor")
 918			return cfg.Ctx.Err()
 919		case <-ticker.C:
 920			if err := monitorTick(cfg, log, output, jobStates); err != nil {
 921				log.Error("monitor tick", "err", err)
 922			}
 923		case <-gcTicker.C:
 924			log.Debug("running periodic garbage collection")
 925			if err := runGC(cfg); err != nil {
 926				log.Error("periodic gc", "err", err)
 927			}
 928		}
 929	}
 930}
 931
 932// monitorTick performs a single monitoring pass over all ci.* sessions.
 933// renderJobRunning prints per-session status for a running job, using ANSI
 934// cursor control to overwrite previous lines.
 935func renderJobRunning(output io.Writer, name, jobID string, group []SessionInfo, duration string, jobStates map[string]*monitorJobState) {
 936	key := name + "/" + jobID
 937	state, ok := jobStates[key]
 938	if !ok {
 939		state = &monitorJobState{sessions: make(map[string]*sessionState)}
 940		jobStates[key] = state
 941	}
 942
 943	// Track new sessions
 944	for _, s := range group {
 945		if _, ok := state.sessions[s.Short]; !ok {
 946			state.sessions[s.Short] = &sessionState{}
 947			state.sessionOrder = append(state.sessionOrder, s.Short)
 948		}
 949	}
 950
 951	// Update session states
 952	for _, s := range group {
 953		ss := state.sessions[s.Short]
 954		if s.Ended == "" {
 955			ss.status = "running"
 956			created, _ := strconv.ParseInt(s.Created, 10, 64)
 957			ss.duration = fmtDurationTs(created, time.Now().Unix())
 958		} else {
 959			if s.ExitCode == "0" {
 960				ss.status = "success"
 961			} else {
 962				ss.status = "failed"
 963				ss.exitCode = s.ExitCode
 964			}
 965			ss.duration = fmtDuration(s.Created, s.Ended)
 966		}
 967	}
 968
 969	// Build status lines
 970	lines := make([]string, 0, len(state.sessionOrder))
 971	for _, short := range state.sessionOrder {
 972		ss := state.sessions[short]
 973		icon := map[string]string{"running": "🚀", "success": "✅", "failed": "❌"}[ss.status]
 974		detail := ""
 975		if ss.status == "failed" && ss.exitCode != "" {
 976			detail = fmt.Sprintf(", exit %s", ss.exitCode)
 977		}
 978		if ss.duration != "" && ss.duration != "—" {
 979			detail += fmt.Sprintf(" (%s)", ss.duration)
 980		}
 981		lines = append(lines, fmt.Sprintf("   %-12s %s %s%s", short, icon, ss.status, detail))
 982	}
 983
 984	// Overwrite previous lines or print fresh
 985	if len(state.liveLines) > 0 {
 986		for range len(state.liveLines) {
 987			fmt.Fprint(output, "\033[A") //nolint:errcheck
 988		}
 989		for i, line := range lines {
 990			if i > 0 {
 991				fmt.Fprint(output, "\n") //nolint:errcheck
 992			}
 993			fmt.Fprint(output, line+"\033[K") //nolint:errcheck
 994		}
 995		fmt.Fprint(output, "\n") //nolint:errcheck
 996	} else {
 997		fmt.Fprintf(output, "   %-12s %s %s\n", name, "🚀", "running") //nolint:errcheck
 998		for _, line := range lines {
 999			fmt.Fprintln(output, line) //nolint:errcheck
1000		}
1001	}
1002	state.liveLines = lines
1003}
1004
1005// renderJobFinal prints the final status for a completed job.
1006func renderJobFinal(output io.Writer, name, jobID string, group []SessionInfo, duration, status string, success bool, workspace, artifactDir string) {
1007	icon := map[string]string{"success": "✅", "failed": "❌"}[status]
1008	fmt.Fprintf(output, "   %-12s %s %s (%s)\n", name, icon, status, duration) //nolint:errcheck
1009
1010	// Per-session summary
1011	for _, s := range group {
1012		icon := "✅"
1013		if s.ExitCode != "0" {
1014			icon = "❌"
1015		}
1016		dur := fmtDuration(s.Created, s.Ended)
1017		fmt.Fprintf(output, "   %-12s %s done (%s)\n", s.Short, icon, dur) //nolint:errcheck
1018	}
1019
1020	// Context info
1021	fmt.Fprint(output, "\n") //nolint:errcheck
1022	if workspace != "" {
1023		fmt.Fprintf(output, "   workspace:  %s\n", workspace) //nolint:errcheck
1024	}
1025	artifactPath := filepath.Join(artifactDir, name, jobID)
1026	fmt.Fprintf(output, "   artifacts:  %s\n", artifactPath) //nolint:errcheck
1027	fmt.Fprint(output, "\n")                                 //nolint:errcheck
1028}
1029
1030func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer, jobStates map[string]*monitorJobState) error {
1031	// a. zmx list → filter ci.* sessions
1032	listOutput, err := exec.Command("zmx", "list").CombinedOutput()
1033	if err != nil {
1034		return fmt.Errorf("zmx list: %w", err)
1035	}
1036	sessions := parseZMXList(string(listOutput))
1037	ciSessions := filterCISessions(sessions)
1038
1039	if len(ciSessions) == 0 {
1040		log.Debug("no ci.* sessions found")
1041		return nil
1042	}
1043
1044	log.Debug("found ci sessions", "count", len(ciSessions))
1045
1046	// b. Group by job prefix: ci.<name>.<jobID>.
1047	groups := groupSessionsByJob(ciSessions)
1048
1049	// c. Process each job group
1050	for prefix, group := range groups {
1051		name, jobID := parseJobPrefix(prefix)
1052		if name == "" {
1053			continue
1054		}
1055
1056		log := log.With("repo", name, "job_id", jobID)
1057
1058		// Fetch and stage history for every session at each tick,
1059		// not just when the job completes. This gives live progress
1060		// snapshots while the job is running.
1061		for _, s := range group {
1062			// Determine session status and timing
1063			sessionStatus := "running"
1064			sessionDuration := fmtDuration(s.Created, fmt.Sprintf("%d", time.Now().Unix()))
1065			sessionExitCode := ""
1066			if s.Ended != "" {
1067				sessionDuration = fmtDuration(s.Created, s.Ended)
1068				if s.ExitCode == "0" {
1069					sessionStatus = "success"
1070					sessionExitCode = "0"
1071				} else {
1072					sessionStatus = "failed"
1073					sessionExitCode = s.ExitCode
1074				}
1075			}
1076
1077			html, err := fetchHistoryHTML(s.Name, name, jobID, sessionStatus, sessionDuration, sessionExitCode)
1078			if err != nil {
1079				log.Error("fetch history html", "session", s.Name, "err", err)
1080				continue
1081			}
1082			if err := stageArtifact(cfg.ArtifactDir, name, jobID, s.Short, html, ".html"); err != nil {
1083				log.Error("stage html artifact", "session", s.Short, "err", err)
1084			}
1085
1086			plain, err := fetchHistoryPlain(s.Name)
1087			if err != nil {
1088				log.Error("fetch history plain", "session", s.Name, "err", err)
1089				continue
1090			}
1091			if err := stageArtifact(cfg.ArtifactDir, name, jobID, s.Short, plain, ".txt"); err != nil {
1092				log.Error("stage txt artifact", "session", s.Short, "err", err)
1093			}
1094		}
1095
1096		// Generate and stage job index landing pages
1097		indexHTML, indexTXT := generateJobIndex(cfg.ArtifactDir, name, jobID, group)
1098		if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexHTML, ".html"); err != nil {
1099			log.Error("stage index.html", "err", err)
1100		}
1101		if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexTXT, ".txt"); err != nil {
1102			log.Error("stage index.txt", "err", err)
1103		}
1104		// Stage shared CSS
1105		if styles, err := loadStyles(); err == nil {
1106			if err := stageArtifact(cfg.ArtifactDir, name, jobID, "styles", styles, ".css"); err != nil {
1107				log.Error("stage styles.css", "err", err)
1108			}
1109		}
1110
1111		// Load event to get artifact destination
1112		eventData, _ := loadEvent(cfg.ArtifactDir, name, jobID)
1113
1114		// Compute job-level timing from session timestamps
1115		startedAt, endedAt, duration := computeJobTiming(group)
1116
1117		if allCompleted(group) {
1118			// Check sentinel — publish terminal status exactly once
1119			sentinel := filepath.Join(cfg.ArtifactDir, name, jobID, "artifacts", "published.json")
1120			log.Debug("checking completion", "all_completed", true, "sentinel", sentinel)
1121			if _, err := os.Stat(sentinel); err == nil {
1122				log.Debug("terminal status already published, skipping", "sentinel", sentinel)
1123				continue
1124			}
1125
1126			log.Debug("job completed, publishing final status", "sessions", len(group))
1127			exitCode, status := resolveJobExitCode(group)
1128			log.Info("job finished", "status", status, "exit_code", exitCode)
1129
1130			if cfg.HumanOutput {
1131				renderJobFinal(output, name, jobID, group, duration, status, exitCode == 0, eventData.Workspace, cfg.ArtifactDir)
1132			} else {
1133				payload := StatusPayload{
1134					Timestamp:    time.Now().UTC().Format(time.RFC3339),
1135					Name:         name,
1136					JobID:        jobID,
1137					Status:       status,
1138					ExitCode:     &exitCode,
1139					Duration:     duration,
1140					StartedAt:    startedAt,
1141					EndedAt:      endedAt,
1142					SessionCount: len(group),
1143					Sessions:     group,
1144				}
1145				if err := publishStatus(output, payload); err != nil {
1146					log.Error("publish final status", "err", err)
1147				}
1148			}
1149			// Write published.json, then sync to include it in the rsync.
1150			// This is the only sync for completed jobs. In-progress jobs are
1151			// synced every tick in the else branch below.
1152			published := map[string]interface{}{
1153				"status":      status,
1154				"exit_code":   exitCode,
1155				"job_id":      jobID,
1156				"finished_at": time.Now().UTC().Format(time.RFC3339),
1157			}
1158			publishedJSON, _ := json.Marshal(published)
1159			log.Info("writing sentinel", "path", sentinel)
1160			if err := os.WriteFile(sentinel, publishedJSON, 0644); err != nil {
1161				log.Error("write published sentinel", "err", err)
1162			}
1163			// Regenerate index.html now that published.json exists, so the artifact list includes it.
1164			indexHTML, indexTXT := generateJobIndex(cfg.ArtifactDir, name, jobID, group)
1165			if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexHTML, ".html"); err != nil {
1166				log.Error("stage index.html", "err", err)
1167			}
1168			if err := stageArtifact(cfg.ArtifactDir, name, jobID, "index", indexTXT, ".txt"); err != nil {
1169				log.Error("stage index.txt", "err", err)
1170			}
1171			if err := syncJobArtifacts(cfg, name, jobID, log); err != nil {
1172				log.Error("sync artifacts", "err", err)
1173			}
1174		} else {
1175			log.Debug("job still running", "sessions", len(group))
1176			// Sync in-progress jobs every tick using the sessions we already know about.
1177			if err := syncJobArtifacts(cfg, name, jobID, log); err != nil {
1178				log.Error("sync artifacts", "err", err)
1179			}
1180			// Publish running status only when --include-running is set
1181			if cfg.IncludeRunning {
1182				if cfg.HumanOutput {
1183					renderJobRunning(output, name, jobID, group, duration, jobStates)
1184				} else {
1185					payload := StatusPayload{
1186						Timestamp:    time.Now().UTC().Format(time.RFC3339),
1187						Name:         name,
1188						JobID:        jobID,
1189						Status:       "running",
1190						ExitCode:     nil,
1191						Duration:     duration,
1192						StartedAt:    startedAt,
1193						SessionCount: len(group),
1194						Sessions:     group,
1195					}
1196					if err := publishStatus(output, payload); err != nil {
1197						log.Error("publish status", "err", err)
1198					}
1199				}
1200			}
1201		}
1202	}
1203
1204	return nil
1205}
1206
1207// loadEvent reads the event.json for a job from the artifact directory.
1208func loadEvent(dir, name, jobID string) (Event, error) {
1209	var event Event
1210	data, err := os.ReadFile(filepath.Join(dir, name, jobID, "artifacts", "event.json"))
1211	if err != nil {
1212		return event, err
1213	}
1214	return event, json.Unmarshal(data, &event)
1215}
1216
1217// computeJobTiming derives started_at, ended_at, and duration from session timestamps.
1218// started_at is the earliest session creation time, ended_at is the latest session end time.
1219// For running jobs, ended_at is empty and duration is computed against now.
1220func computeJobTiming(sessions []SessionInfo) (startedAt, endedAt, duration string) {
1221	var earliestCreated, latestEnded int64
1222	hasCreated, hasEnded := false, false
1223
1224	for _, s := range sessions {
1225		if s.Created != "" {
1226			var c int64
1227			if _, err := fmt.Sscanf(s.Created, "%d", &c); err == nil {
1228				if !hasCreated || c < earliestCreated {
1229					earliestCreated = c
1230				}
1231				hasCreated = true
1232			}
1233		}
1234		if s.Ended != "" {
1235			var e int64
1236			if _, err := fmt.Sscanf(s.Ended, "%d", &e); err == nil {
1237				if !hasEnded || e > latestEnded {
1238					latestEnded = e
1239				}
1240				hasEnded = true
1241			}
1242		}
1243	}
1244
1245	if hasCreated {
1246		startedAt = time.Unix(earliestCreated, 0).UTC().Format(time.RFC3339)
1247	}
1248	if hasEnded {
1249		endedAt = time.Unix(latestEnded, 0).UTC().Format(time.RFC3339)
1250	}
1251
1252	if hasCreated && hasEnded {
1253		duration = fmtDurationTs(earliestCreated, latestEnded)
1254	} else if hasCreated {
1255		duration = fmtDurationTs(earliestCreated, time.Now().Unix())
1256	}
1257
1258	return startedAt, endedAt, duration
1259}
1260
1261// fmtDurationTs formats the duration between two unix timestamps.
1262func fmtDurationTs(started, ended int64) string {
1263	d := time.Duration(ended-started) * time.Second
1264	if d < 0 {
1265		return "—"
1266	}
1267	if d >= time.Minute {
1268		return fmt.Sprintf("%dm%ds", d/time.Minute, d%time.Minute/time.Second)
1269	}
1270	secs := float64(d) / float64(time.Second)
1271	if secs >= 10 {
1272		return fmt.Sprintf("%ds", int(secs))
1273	}
1274	return fmt.Sprintf("%.1fs", secs)
1275}
1276
1277// filterCISessions returns only sessions with ci. prefix.
1278func filterCISessions(sessions []SessionInfo) []SessionInfo {
1279	var filtered []SessionInfo
1280	for _, s := range sessions {
1281		if strings.HasPrefix(s.Name, "ci.") {
1282			filtered = append(filtered, s)
1283		}
1284	}
1285	return filtered
1286}
1287
1288// groupSessionsByJob groups sessions by their job prefix (ci.<name>.<jobID>.).
1289func groupSessionsByJob(sessions []SessionInfo) map[string][]SessionInfo {
1290	groups := make(map[string][]SessionInfo)
1291	for _, s := range sessions {
1292		prefix := extractJobPrefix(s.Name)
1293		if prefix == "" {
1294			continue
1295		}
1296		// Set the Short name
1297		name, jobID := parseJobPrefix(prefix)
1298		s.Short = cleanSessionShort(s.Name, prefix, name, jobID)
1299		groups[prefix] = append(groups[prefix], s)
1300	}
1301	return groups
1302}
1303
1304// parseJobPrefix extracts name and jobID from a prefix like "ci.<name>.<jobID>.".
1305func parseJobPrefix(prefix string) (name, jobID string) {
1306	// ci.name.jobID. -> ["ci", "name", "jobID", ""]
1307	parts := strings.Split(prefix, ".")
1308	if len(parts) < 4 {
1309		return "", ""
1310	}
1311	return parts[1], parts[2]
1312}
1313
1314// resolveJobExitCode determines the job's exit code from its sessions.
1315// Defensive: if any child session failed, the job failed regardless of the
1316// runner's exit code. This protects against bad pico.sh scripts that exit 0
1317// without waiting for children.
1318func resolveJobExitCode(sessions []SessionInfo) (int, string) {
1319	var runnerExit *int
1320	var worstChild *int // highest non-zero child exit code
1321
1322	for _, s := range sessions {
1323		if s.ExitCode == "" {
1324			continue
1325		}
1326		var code int
1327		if _, err := fmt.Sscanf(s.ExitCode, "%d", &code); err != nil {
1328			continue
1329		}
1330
1331		if strings.HasSuffix(s.Name, ".runner") {
1332			runnerExit = &code
1333		} else if code != 0 {
1334			if worstChild == nil || code > *worstChild {
1335				worstChild = &code
1336			}
1337		}
1338	}
1339
1340	// Any child failure overrides the runner — defensive against bad scripts
1341	if worstChild != nil {
1342		return *worstChild, "failed"
1343	}
1344	if runnerExit != nil && *runnerExit != 0 {
1345		return *runnerExit, "failed"
1346	}
1347	return 0, "success"
1348}
1349
1350func generateJobID(name, workspace string) string {
1351	return jobIDFor(name, workspace, time.Now().UnixNano())
1352}
1353
1354func runtimeOS() string {
1355	return runtime.GOOS
1356}
1357
1358func runtimeArch() string {
1359	return runtime.GOARCH
1360}
1361
1362func jobIDFor(name, workspace string, ts int64) string {
1363	h := sha256.Sum256([]byte(name + workspace + fmt.Sprintf("%d", ts)))
1364	return fmt.Sprintf("%x", h[:4])
1365}
1366
1367func parseZMXList(output string) []SessionInfo {
1368	var sessions []SessionInfo
1369	lines := strings.Split(strings.TrimSpace(output), "\n")
1370	for _, line := range lines {
1371		line = strings.TrimSpace(line)
1372		if line == "" {
1373			continue
1374		}
1375		// Strip leading arrow/space prefix
1376		line = strings.TrimPrefix(line, "→ ")
1377		line = strings.TrimSpace(line)
1378
1379		fields := strings.FieldsFunc(line, func(r rune) bool {
1380			return r == '\t'
1381		})
1382
1383		var si SessionInfo
1384		for _, field := range fields {
1385			parts := strings.SplitN(field, "=", 2)
1386			if len(parts) != 2 {
1387				continue
1388			}
1389			switch parts[0] {
1390			case "name":
1391				si.Name = parts[1]
1392			case "pid":
1393				si.PID = parts[1]
1394			case "clients":
1395				si.Clients = parts[1]
1396			case "created":
1397				si.Created = parts[1]
1398			case "start_dir":
1399				si.StartDir = parts[1]
1400			case "ended":
1401				si.Ended = parts[1]
1402			case "exit_code":
1403				si.ExitCode = parts[1]
1404			}
1405		}
1406		if si.Name != "" {
1407			sessions = append(sessions, si)
1408		}
1409	}
1410	return sessions
1411}
1412
1413// loadStyles reads the shared CSS from the embedded template filesystem.
1414func loadStyles() (string, error) {
1415	data, err := fs.ReadFile(tmplFS, "tmpl/styles.css")
1416	if err != nil {
1417		return "", fmt.Errorf("read styles: %w", err)
1418	}
1419	return string(data), nil
1420}
1421
1422// SessionArtifactData holds the data for rendering a session HTML artifact.
1423type SessionArtifactData struct {
1424	SessionName   string
1425	SessionShort  string
1426	SessionStatus string
1427	JobName       string
1428	JobID         string
1429	Duration      string
1430	ExitCode      string
1431	Content       template.HTML
1432}
1433
1434// fetchHistoryHTML fetches session history from zmx and wraps it in a full HTML document.
1435func fetchHistoryHTML(sessionName, jobName, jobID, status, duration, exitCode string) (string, error) {
1436	cmd := exec.Command("zmx", "history", sessionName, "--html")
1437	output, err := cmd.Output()
1438	if err != nil {
1439		return "", err
1440	}
1441
1442	prefix := "ci." + jobName + "." + jobID + "."
1443	shortName := strings.TrimPrefix(sessionName, prefix)
1444	shortName = strings.TrimPrefix(shortName, "step.")
1445
1446	data := SessionArtifactData{
1447		SessionName:   sessionName,
1448		SessionShort:  shortName,
1449		SessionStatus: status,
1450		JobName:       jobName,
1451		JobID:         jobID,
1452		Duration:      duration,
1453		ExitCode:      exitCode,
1454		Content:       template.HTML(string(output)),
1455	}
1456
1457	tmpl, err := template.ParseFS(tmplFS, "tmpl/session.html")
1458	if err != nil {
1459		return "", fmt.Errorf("parse template: %w", err)
1460	}
1461
1462	var buf strings.Builder
1463	if err := tmpl.ExecuteTemplate(&buf, "session.html", data); err != nil {
1464		return "", fmt.Errorf("execute template: %w", err)
1465	}
1466
1467	return buf.String(), nil
1468}
1469
1470func fetchHistoryPlain(sessionName string) (string, error) {
1471	cmd := exec.Command("zmx", "history", sessionName, "--plain")
1472	output, err := cmd.Output()
1473	if err != nil {
1474		return "", err
1475	}
1476	return string(output), nil
1477}
1478
1479func publishStatus(w io.Writer, payload StatusPayload) error {
1480	data, err := json.Marshal(payload)
1481	if err != nil {
1482		return err
1483	}
1484	_, err = w.Write(append(data, '\n'))
1485	return err
1486}
1487
1488func stageArtifact(dir, name, jobID, short, content, ext string) error {
1489	path := filepath.Join(dir, name, jobID, short+ext)
1490	if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
1491		return err
1492	}
1493	return os.WriteFile(path, []byte(content), 0644)
1494}
1495
1496// syncJobArtifacts syncs artifacts for a single job.
1497func syncJobArtifacts(cfg *Cfg, repoName, jobID string, log *slog.Logger) error {
1498	jobDir := filepath.Join(cfg.ArtifactDir, repoName, jobID)
1499	eventPath := filepath.Join(jobDir, "artifacts", "event.json")
1500	data, err := os.ReadFile(eventPath)
1501	if err != nil {
1502		if os.IsNotExist(err) {
1503			log.Debug("event.json not found, skipping sync", "repo", repoName, "job_id", jobID)
1504			return nil
1505		}
1506		return fmt.Errorf("read event: %w", err)
1507	}
1508	var event Event
1509	if err := json.Unmarshal(data, &event); err != nil || event.ArtifactDest == "" {
1510		return fmt.Errorf("invalid event")
1511	}
1512	log.Debug("syncing artifacts", "repo", repoName, "job_id", jobID, "dest", event.ArtifactDest)
1513	var sshArgs string
1514	if cfg.KeyLocation != "" {
1515		certFile := ""
1516		if cfg.CertificateLocation != "" {
1517			certFile = fmt.Sprintf(" -o CertificateFile %s", cfg.CertificateLocation)
1518		}
1519		sshArgs = fmt.Sprintf("-F ~/.ssh/config -i %s%s", cfg.KeyLocation, certFile)
1520	}
1521	// Append "/" so rsync copies into the destination directory,
1522	// not as a subdirectory named after the source.
1523	dest := event.ArtifactDest
1524	if !strings.HasSuffix(dest, "/") {
1525		dest += "/"
1526	}
1527	var cmd *exec.Cmd
1528	if sshArgs != "" {
1529		cmd = exec.Command("rsync", "-e", sshArgs, "-rv", jobDir+"/", dest)
1530	} else {
1531		cmd = exec.Command("rsync", "-rv", jobDir+"/", dest)
1532	}
1533	rsyncCmd := fmt.Sprintf("rsync %s %s %s",
1534		strings.TrimLeft(cmd.Args[1], "-"),
1535		jobDir+"/", dest)
1536	log.Info("rsync", "cmd", rsyncCmd)
1537	return runCmd(cmd, log)
1538}
1539
1540func allCompleted(sessions []SessionInfo) bool {
1541	for _, s := range sessions {
1542		if s.Ended == "" {
1543			return false
1544		}
1545	}
1546	return true
1547}
1548
1549func runCmd(cmd *exec.Cmd, log *slog.Logger) error {
1550	stdout, err := cmd.StdoutPipe()
1551	if err != nil {
1552		return err
1553	}
1554
1555	stderr, err := cmd.StderrPipe()
1556	if err != nil {
1557		return err
1558	}
1559
1560	if err := cmd.Start(); err != nil {
1561		return err
1562	}
1563
1564	go func() {
1565		scanner := bufio.NewScanner(stdout)
1566		for scanner.Scan() {
1567			log.Debug("cmd stdout", "text", scanner.Text())
1568		}
1569	}()
1570
1571	go func() {
1572		scanner := bufio.NewScanner(stderr)
1573		for scanner.Scan() {
1574			log.Error("cmd stderr", "text", scanner.Text())
1575		}
1576	}()
1577
1578	return cmd.Wait()
1579}
1580
1581// runCancel reads an event from stdin and cancels any running job with matching name+type.
1582func runCancel(cfg *Cfg) error {
1583	log := cfg.Logger.With("cmd", "cancel")
1584
1585	// Read event from stdin
1586	scanner := bufio.NewScanner(os.Stdin)
1587	if !scanner.Scan() {
1588		return fmt.Errorf("no input on stdin")
1589	}
1590
1591	var event Event
1592	if err := json.Unmarshal([]byte(scanner.Text()), &event); err != nil {
1593		return fmt.Errorf("unmarshal event: %w", err)
1594	}
1595
1596	log = log.With("repo", event.Name, "type", event.Type)
1597	log.Info("cancelling running jobs for repo")
1598
1599	cancelRunningJobs(cfg, log, event.Name)
1600	return nil
1601}
1602
1603// cancelRunningJobs finds and cancels all running jobs for a given repo name.
1604// It kills the runner sessions (which cascades to children). The monitor will
1605// detect the ended sessions and publish cancelled status on its next tick.
1606func cancelRunningJobs(cfg *Cfg, log *slog.Logger, name string) {
1607	runnerSessions, _ := findRunningJobs(name)
1608	if len(runnerSessions) == 0 {
1609		log.Debug("no running jobs to cancel")
1610		return
1611	}
1612
1613	log.Debug("found running jobs to cancel", "count", len(runnerSessions))
1614
1615	for _, runnerName := range runnerSessions {
1616		jobID := extractJobID(runnerName)
1617		log := log.With("job_id", jobID)
1618
1619		log.Debug("cancelling job", "runner", runnerName)
1620		if err := killSessions([]string{runnerName}); err != nil {
1621			log.Error("kill runner session", "err", err)
1622			continue
1623		}
1624		log.Debug("cancelled runner session")
1625	}
1626}
1627
1628// findRunningJobs finds all active runner sessions for a given name.
1629// Returns runner session names and all sessions for reference.
1630func findRunningJobs(name string) ([]string, []SessionInfo) {
1631	listOutput, err := exec.Command("zmx", "list").CombinedOutput()
1632	if err != nil {
1633		return nil, nil
1634	}
1635
1636	sessions := parseZMXList(string(listOutput))
1637	var runners []string
1638	for _, s := range sessions {
1639		// Match ci.<name>.*.runner sessions that are still active (no ended)
1640		if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") && s.Ended == "" {
1641			runners = append(runners, s.Name)
1642		}
1643	}
1644	return runners, sessions
1645}
1646
1647// extractJobID extracts the jobID from a runner session name like ci.<name>.<jobID>.runner.
1648func extractJobID(runnerName string) string {
1649	// ci.<name>.<jobID>.runner
1650	// Remove "ci." prefix and ".runner" suffix, then take the part after the first "."
1651	name := strings.TrimSuffix(runnerName, ".runner")
1652	name = strings.TrimPrefix(name, "ci.")
1653	// name is now <name>.<jobID>, take the jobID part
1654	parts := strings.SplitN(name, ".", 2)
1655	if len(parts) == 2 {
1656		return parts[1]
1657	}
1658	return ""
1659}
1660
1661// killSessions kills zmx sessions by name.
1662func killSessions(names []string) error {
1663	if len(names) == 0 {
1664		return nil
1665	}
1666	args := append([]string{"kill"}, names...)
1667	cmd := exec.Command("zmx", args...)
1668	output, err := cmd.CombinedOutput()
1669	if err != nil {
1670		return fmt.Errorf("zmx kill: %s: %w", string(output), err)
1671	}
1672	return nil
1673}
1674
1675// runGC kills all ci. zmx sessions older than 3 hours, regardless of status.
1676func runGC(cfg *Cfg) error {
1677	log := cfg.Logger.With("cmd", "gc")
1678	log.Debug("running garbage collection")
1679
1680	listOutput, err := exec.Command("zmx", "list").CombinedOutput()
1681	if err != nil {
1682		return fmt.Errorf("zmx list: %w", err)
1683	}
1684
1685	sessions := parseZMXList(string(listOutput))
1686
1687	cutoff := time.Now().Add(-3 * time.Hour).Unix()
1688
1689	var toKill []string
1690	for _, s := range sessions {
1691		if !strings.HasPrefix(s.Name, "ci.") {
1692			continue
1693		}
1694
1695		if s.Created == "" {
1696			continue // skip sessions with no creation time
1697		}
1698
1699		var created int64
1700		if _, err := fmt.Sscanf(s.Created, "%d", &created); err != nil {
1701			continue
1702		}
1703
1704		if created < cutoff {
1705			log.Debug("session expired, scheduling for gc", "session", s.Name, "created", s.Created)
1706			toKill = append(toKill, s.Name)
1707		}
1708	}
1709
1710	if len(toKill) == 0 {
1711		log.Debug("no sessions to garbage collect")
1712		return nil
1713	}
1714
1715	if err := killSessions(toKill); err != nil {
1716		return fmt.Errorf("kill sessions: %w", err)
1717	}
1718
1719	log.Debug("garbage collection complete", "killed", len(toKill))
1720	return nil
1721}
1722
1723// extractJobPrefix extracts the job prefix from a session name.
1724// ci.<name>.<jobID>.<step> -> ci.<name>.<jobID>.
1725func extractJobPrefix(sessionName string) string {
1726	// Extract the job key ci.<name>.<jobID> from session names:
1727	//   ci.name.jobID.runner        (4 parts)  → ci.name.jobID.
1728	//   ci.name.jobID.step.lint     (5 parts)  → ci.name.jobID.
1729	parts := strings.Split(sessionName, ".")
1730	if len(parts) < 4 {
1731		return ""
1732	}
1733	return parts[0] + "." + parts[1] + "." + parts[2] + "."
1734}
1735
1736// sessionRow holds display info for a single session in the job index.
1737type sessionRow struct {
1738	Name     string
1739	Short    string
1740	Status   string
1741	ExitCode string
1742	Started  string
1743	Ended    string
1744	Duration string
1745}
1746
1747// artifactRow holds display info for non-session artifacts (e.g., event.json, workspace.tar).
1748type artifactRow struct {
1749	Name    string
1750	Size    string
1751	ModTime string
1752}
1753
1754// formatFileSize returns a human-readable file size string.
1755func formatFileSize(bytes int64) string {
1756	const unit = 1024
1757	if bytes < unit {
1758		return fmt.Sprintf("%d B", bytes)
1759	}
1760	div, exp := int64(unit), 0
1761	for n := bytes / unit; n >= unit; n /= unit {
1762		div *= unit
1763		exp++
1764	}
1765	units := []string{"KB", "MB", "GB", "TB"}
1766	return fmt.Sprintf("%.1f %s", float64(bytes)/float64(div), units[exp])
1767}
1768
1769// formatTimestamp converts a unix timestamp to a human-readable format.
1770func formatTimestamp(ts string) string {
1771	if ts == "" {
1772		return "—"
1773	}
1774	var t int64
1775	if _, err := fmt.Sscanf(ts, "%d", &t); err != nil {
1776		return "—"
1777	}
1778	return time.Unix(t, 0).UTC().Format("2006-01-02 15:04:05")
1779}
1780
1781// generateJobIndex produces HTML and plain-text index pages listing all
1782// sessions for a job with links and metadata (status, exit code, ended at).
1783// It also includes any other artifacts in the job directory (e.g., event.json, attestation.json, workspace.tar).
1784func generateJobIndex(artifactDir, name, jobID string, sessions []SessionInfo) (htmlContent, txtContent string) {
1785	rows := make([]sessionRow, 0, len(sessions))
1786	for _, s := range sessions {
1787		row := sessionRow{
1788			Name:    s.Name,
1789			Short:   s.Short,
1790			Started: s.Created,
1791		}
1792		if s.Ended == "" {
1793			row.Status = "running"
1794			row.Duration = fmtDuration(s.Created, fmt.Sprintf("%d", time.Now().Unix()))
1795		} else if s.ExitCode == "0" {
1796			row.Status = "success"
1797			row.ExitCode = "0"
1798		} else {
1799			row.Status = "failed"
1800			row.ExitCode = s.ExitCode
1801		}
1802		row.Ended = s.Ended
1803		row.Duration = fmtDuration(s.Created, s.Ended)
1804		rows = append(rows, row)
1805	}
1806
1807	// Gather artifacts by scanning the artifacts/ subfolder
1808	var artifacts []artifactRow
1809	artifactsDir := filepath.Join(artifactDir, name, jobID, "artifacts")
1810
1811	if entries, err := os.ReadDir(artifactsDir); err == nil {
1812		for _, entry := range entries {
1813			if entry.IsDir() {
1814				continue
1815			}
1816			info, err := entry.Info()
1817			if err != nil {
1818				continue
1819			}
1820			artifacts = append(artifacts, artifactRow{
1821				Name:    entry.Name(),
1822				Size:    formatFileSize(info.Size()),
1823				ModTime: formatTimestamp(fmt.Sprintf("%d", info.ModTime().Unix())),
1824			})
1825		}
1826	}
1827
1828	// Resolve overall job status
1829	jobStatus := "success"
1830	hasRunning := false
1831	for _, r := range rows {
1832		if r.Status == "running" {
1833			hasRunning = true
1834			break
1835		}
1836		if r.Status == "failed" {
1837			jobStatus = "failed"
1838		}
1839	}
1840	if hasRunning {
1841		jobStatus = "running"
1842	}
1843
1844	// HTML index
1845	tmpl, err := template.New("index").Funcs(template.FuncMap{
1846		"formatTimestamp": formatTimestamp,
1847	}).ParseFS(tmplFS, "tmpl/index.html")
1848	if err == nil {
1849		var buf strings.Builder
1850		if err := tmpl.ExecuteTemplate(&buf, "index.html", struct {
1851			Name      string
1852			JobID     string
1853			JobStatus string
1854			Rows      []sessionRow
1855			Artifacts []artifactRow
1856		}{Name: name, JobID: jobID, JobStatus: jobStatus, Rows: rows, Artifacts: artifacts}); err == nil {
1857			htmlContent = buf.String()
1858		}
1859	}
1860
1861	// Plain-text index
1862	var buf strings.Builder
1863	statusIcon := map[string]string{"success": "\u2705", "failed": "\u274c", "running": "\u23f3"}
1864	icon := statusIcon[jobStatus]
1865	fmt.Fprintf(&buf, "Job: %s (%s) %s\n", name, jobID, icon)
1866	fmt.Fprintf(&buf, "Sessions: %d\n", len(rows))
1867	fmt.Fprintln(&buf, strings.Repeat("-", 70))
1868	for _, r := range rows {
1869		rIcon := statusIcon[r.Status]
1870		fmt.Fprintf(&buf, "  %s %-20s  exit: %-5s  duration: %-8s  ended: %s\n",
1871			rIcon, r.Short, r.ExitCode, r.Duration, r.Ended)
1872	}
1873	fmt.Fprintln(&buf, strings.Repeat("-", 60))
1874	txtContent = buf.String()
1875
1876	return htmlContent, txtContent
1877}
1878
1879// fmtDuration formats the duration between two unix timestamp strings.
1880// Returns human-readable strings like "2m34s", "1.5s", or "—" if invalid.
1881func fmtDuration(created, ended string) string {
1882	if created == "" || ended == "" {
1883		return "—"
1884	}
1885	var c, e int64
1886	if _, err := fmt.Sscanf(created, "%d", &c); err != nil {
1887		return "—"
1888	}
1889	if _, err := fmt.Sscanf(ended, "%d", &e); err != nil {
1890		return "—"
1891	}
1892	duration := time.Duration(e-c) * time.Second
1893	if duration < 0 {
1894		return "—"
1895	}
1896	if duration >= time.Minute {
1897		return fmt.Sprintf("%dm%ds", duration/time.Minute, duration%time.Minute/time.Second)
1898	}
1899	secs := float64(duration) / float64(time.Second)
1900	if secs >= 10 {
1901		return fmt.Sprintf("%ds", int(secs))
1902	}
1903	return fmt.Sprintf("%.1fs", secs)
1904}