Commit 85f918b
Eric Bower
·
2026-05-07 11:26:22 -0400 EDT
parent 9b7abfc
chore: cleanup
M
go.mod
+0,
-2
1@@ -1,5 +1,3 @@
2 module github.com/picosh/pici
3
4 go 1.26.2
5-
6-require github.com/golang-cz/devslog v0.0.15
M
go.sum
+0,
-2
1@@ -1,2 +0,0 @@
2-github.com/golang-cz/devslog v0.0.15 h1:ejoBLTCwJHWGbAmDf2fyTJJQO3AkzcPjw8SC9LaOQMI=
3-github.com/golang-cz/devslog v0.0.15/go.mod h1:bSe5bm0A7Nyfqtijf1OMNgVJHlWEuVSXnkuASiE1vV8=
M
main.go
+134,
-29
1@@ -19,7 +19,7 @@ import (
2 "syscall"
3 "time"
4
5- "github.com/golang-cz/devslog"
6+
7 )
8
9 type WorkspaceFactory func(cfg *Cfg, logger *slog.Logger, source string) Workspace
10@@ -61,14 +61,12 @@ func NewCfg() (*Cfg, string, bool) {
11 var keyLoc, certLoc, artifactDir, event string
12 var monitorInterval time.Duration
13 var logLevel string
14- var structured bool
15 flag.StringVar(&keyLoc, "pk", "", "ssh private key used to authenticate with pico services")
16 flag.StringVar(&certLoc, "ck", "", "ssh certificate public key used to authenticate with pico services (only required if using ssh certificates)")
17 flag.StringVar(&artifactDir, "artifact-dir", "/tmp/pici-artifacts", "local directory to stage artifacts")
18 flag.StringVar(&event, "event", "", "event JSON to run (alternative to reading from stdin)")
19 flag.DurationVar(&monitorInterval, "monitor-interval", 5*time.Second, "interval for monitoring zmx sessions")
20 flag.StringVar(&logLevel, "log-level", "info", "log level: debug, info, warn, error")
21- flag.BoolVar(&structured, "structured", false, "use structured key=value log output")
22 var statusFilter string
23 var human bool
24 var wait bool
25@@ -81,7 +79,7 @@ func NewCfg() (*Cfg, string, bool) {
26 flags, cmd, wantHelp := splitCommand(os.Args[1:])
27 flag.CommandLine.Parse(flags)
28
29- logger := newLogger("ci", logLevel, structured)
30+ logger := newLogger("ci", logLevel)
31 ctx, cancel := context.WithCancel(context.Background())
32 return &Cfg{
33 NewWorkspace: defaultWorkspaceFactory,
34@@ -131,17 +129,10 @@ func parseLogLevel(s string) slog.Level {
35 }
36 }
37
38-func newLogger(space string, levelStr string, structured bool) *slog.Logger {
39+func newLogger(space string, levelStr string) *slog.Logger {
40 lvl := parseLogLevel(levelStr)
41- if structured {
42- return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
43- Level: lvl,
44- })).With("service", space)
45- }
46- return slog.New(devslog.NewHandler(os.Stderr, &devslog.Options{
47- HandlerOptions: &slog.HandlerOptions{
48- Level: lvl,
49- },
50+ return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
51+ Level: lvl,
52 })).With("service", space)
53 }
54
55@@ -528,12 +519,6 @@ func waitAndReport(cfg *Cfg, log *slog.Logger, name, jobID string) error {
56 fmt.Fprint(os.Stdout, "⏳ waiting for completion...\n")
57
58 // Track state for each session
59- type sessionState struct {
60- status string // "running", "success", "failed"
61- exitCode string
62- duration string
63- printed bool // true once final state (success/failed) is printed
64- }
65 known := make(map[string]*sessionState)
66 var jobSessions []SessionInfo
67 var sessionOrder []string // track insertion order for deterministic output
68@@ -702,6 +687,23 @@ func cleanSessionShort(name, prefix, repoName, jobID string) string {
69 // pici monitor | ssh pipe.pico.sh "pub build.status -b=false"
70 // pici monitor | while read -r line; do curl -sd"$line" $WEBHOOK; done
71 // pici monitor > status.jsonl
72+//
73+// sessionState tracks the display state of a single session.
74+type sessionState struct {
75+ status string // "running", "success", "failed"
76+ exitCode string
77+ duration string
78+ printed bool // true once final state (success/failed) is printed
79+}
80+
81+// monitorJobState tracks display state for a single job across ticks.
82+type monitorJobState struct {
83+ sessionOrder []string // insertion order for deterministic output
84+ sessions map[string]*sessionState
85+ liveLines []string // last set of status lines printed (for overwrite)
86+ published bool // final status already printed
87+}
88+
89 func runMonitor(cfg *Cfg) error {
90 log := cfg.Logger.With("cmd", "monitor")
91
92@@ -713,6 +715,9 @@ func runMonitor(cfg *Cfg) error {
93 ticker := time.NewTicker(cfg.MonitorInterval)
94 defer ticker.Stop()
95
96+ // Track per-job display state across ticks (for human output)
97+ jobStates := make(map[string]*monitorJobState) // key: "name/jobID"
98+
99 log.Debug("monitor started", "interval", cfg.MonitorInterval, "artifact_dir", cfg.ArtifactDir)
100 log.Debug("monitoring ci.* sessions for job status, writing status to stdout")
101
102@@ -724,14 +729,114 @@ func runMonitor(cfg *Cfg) error {
103 case <-ticker.C:
104 }
105
106- if err := monitorTick(cfg, log, output); err != nil {
107+ if err := monitorTick(cfg, log, output, jobStates); err != nil {
108 log.Error("monitor tick", "err", err)
109 }
110 }
111 }
112
113 // monitorTick performs a single monitoring pass over all ci.* sessions.
114-func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer) error {
115+// renderJobRunning prints per-session status for a running job, using ANSI
116+// cursor control to overwrite previous lines.
117+func renderJobRunning(output io.Writer, name, jobID string, group []SessionInfo, duration string, jobStates map[string]*monitorJobState) {
118+ key := name + "/" + jobID
119+ state, ok := jobStates[key]
120+ if !ok {
121+ state = &monitorJobState{sessions: make(map[string]*sessionState)}
122+ jobStates[key] = state
123+ }
124+
125+ // Track new sessions
126+ for _, s := range group {
127+ if _, ok := state.sessions[s.Short]; !ok {
128+ state.sessions[s.Short] = &sessionState{}
129+ state.sessionOrder = append(state.sessionOrder, s.Short)
130+ }
131+ }
132+
133+ // Update session states
134+ for _, s := range group {
135+ ss := state.sessions[s.Short]
136+ if s.Ended == "" {
137+ ss.status = "running"
138+ created, _ := strconv.ParseInt(s.Created, 10, 64)
139+ ss.duration = fmtDurationTs(created, time.Now().Unix())
140+ } else {
141+ if s.ExitCode == "0" {
142+ ss.status = "success"
143+ } else {
144+ ss.status = "failed"
145+ ss.exitCode = s.ExitCode
146+ }
147+ ss.duration = fmtDuration(s.Created, s.Ended)
148+ }
149+ }
150+
151+ // Build status lines
152+ lines := make([]string, 0, len(state.sessionOrder))
153+ for _, short := range state.sessionOrder {
154+ ss := state.sessions[short]
155+ icon := map[string]string{"running": "🚀", "success": "✅", "failed": "❌"}[ss.status]
156+ detail := ""
157+ if ss.status == "failed" && ss.exitCode != "" {
158+ detail = fmt.Sprintf(", exit %s", ss.exitCode)
159+ }
160+ if ss.duration != "" && ss.duration != "—" {
161+ detail += fmt.Sprintf(" (%s)", ss.duration)
162+ }
163+ lines = append(lines, fmt.Sprintf(" %-12s %s %s%s", short, icon, ss.status, detail))
164+ }
165+
166+ // Overwrite previous lines or print fresh
167+ if len(state.liveLines) > 0 {
168+ for range len(state.liveLines) {
169+ fmt.Fprint(output, "\033[A")
170+ }
171+ for i, line := range lines {
172+ if i > 0 {
173+ fmt.Fprint(output, "\n")
174+ }
175+ fmt.Fprint(output, line+"\033[K")
176+ }
177+ fmt.Fprint(output, "\n")
178+ } else {
179+ fmt.Fprintf(output, " %-12s %s %s\n", name, "🚀", "running")
180+ for _, line := range lines {
181+ fmt.Fprintln(output, line)
182+ }
183+ }
184+ state.liveLines = lines
185+}
186+
187+// renderJobFinal prints the final status for a completed job.
188+func renderJobFinal(output io.Writer, name, jobID string, group []SessionInfo, duration, status string, success bool, workspace, artifactDir, artifactURL string) {
189+ icon := map[string]string{"success": "✅", "failed": "❌"}[status]
190+ fmt.Fprintf(output, " %-12s %s %s (%s)\n", name, icon, status, duration)
191+
192+ // Per-session summary
193+ for _, s := range group {
194+ icon := "✅"
195+ if s.ExitCode != "0" {
196+ icon = "❌"
197+ }
198+ dur := fmtDuration(s.Created, s.Ended)
199+ fmt.Fprintf(output, " %-12s %s done (%s)\n", s.Short, icon, dur)
200+ }
201+
202+ // Context info
203+ fmt.Fprint(output, "\n")
204+ if workspace != "" {
205+ fmt.Fprintf(output, " workspace: %s\n", workspace)
206+ }
207+ artifactPath := filepath.Join(artifactDir, name, jobID)
208+ fmt.Fprintf(output, " artifacts: %s\n", artifactPath)
209+ if artifactURL != "" {
210+ fmt.Fprintf(output, " url: %s\n", artifactURL)
211+ }
212+ fmt.Fprint(output, "\n")
213+}
214+
215+func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer, jobStates map[string]*monitorJobState) error {
216 // a. zmx list → filter ci.* sessions
217 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
218 if err != nil {
219@@ -816,9 +921,7 @@ func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer) error {
220 log.Info("job finished", "status", status, "exit_code", exitCode)
221
222 if cfg.HumanOutput {
223- icon := map[string]string{"success": "✅", "failed": "❌"}[status]
224- fmt.Fprintf(output, "[%d/%d] %s %s: %s (%s)\n",
225- countCompleted(group), len(group), icon, status, name, duration)
226+ renderJobFinal(output, name, jobID, group, duration, status, exitCode == 0, eventData.Workspace, cfg.ArtifactDir, artifactURL)
227 } else {
228 payload := StatusPayload{
229 Name: name,
230@@ -845,9 +948,7 @@ func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer) error {
231 // Publish running status only when filter is "all"
232 if cfg.StatusFilter != "terminal" {
233 if cfg.HumanOutput {
234- completed := countCompleted(group)
235- fmt.Fprintf(output, "[%d/%d] 🚀 running: %s (%s)\n",
236- completed, len(group), name, duration)
237+ renderJobRunning(output, name, jobID, group, duration, jobStates)
238 } else {
239 payload := StatusPayload{
240 Name: name,
241@@ -869,6 +970,9 @@ func monitorTick(cfg *Cfg, log *slog.Logger, output io.Writer) error {
242 }
243
244 // d. Sync artifacts once per tick
245+ if cfg.HumanOutput {
246+ fmt.Fprint(output, " 📦 syncing artifacts...\n")
247+ }
248 if err := syncArtifacts(cfg, log); err != nil {
249 log.Error("sync artifacts", "err", err)
250 }
251@@ -966,7 +1070,8 @@ func groupSessionsByJob(sessions []SessionInfo) map[string][]SessionInfo {
252 continue
253 }
254 // Set the Short name
255- s.Short = strings.TrimPrefix(s.Name, prefix)
256+ name, jobID := parseJobPrefix(prefix)
257+ s.Short = cleanSessionShort(s.Name, prefix, name, jobID)
258 groups[prefix] = append(groups[prefix], s)
259 }
260 return groups