main
main_test.go
Eric Bower
·
2026-05-08
1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log/slog"
11 "os"
12 "os/exec"
13 "path/filepath"
14 "strings"
15 "testing"
16 "time"
17)
18
19// TestE2E_RunnerWithZMXSessions is a full integration test that:
20// 1. Creates a workspace with pico.sh that spawns zmx sessions
21// 2. Feeds an event to RunRunner (fire-and-forget)
22// 3. Runs the monitor to track job completion
23// 4. Reads the status file and asserts correct status transitions.
24func TestE2E_RunnerWithZMXSessions(t *testing.T) {
25 if testing.Short() {
26 t.Skip("skip integration test")
27 }
28 if _, err := exec.LookPath("zmx"); err != nil {
29 t.Skip("zmx not found, skipping integration test")
30 }
31
32 // 1. Create workspace with pico.sh that spawns zmx sessions
33 workspaceDir := t.TempDir()
34 picoSh := `#!/usr/bin/env bash
35set -e
36zmx run step1 echo "hello from step1"
37zmx run step2 echo "hello from step2"
38`
39 if err := os.WriteFile(filepath.Join(workspaceDir, "pico.sh"), []byte(picoSh), 0755); err != nil {
40 t.Fatalf("write pico.sh: %v", err)
41 }
42
43 // 2. Create config
44 artifactDir := t.TempDir()
45 ctx, cancel := context.WithCancel(context.Background())
46 event := Event{
47 Type: "build",
48 Name: "test-repo",
49 Workspace: workspaceDir,
50 }
51 eventJSON, _ := json.Marshal(event)
52 statusBuf := new(bytes.Buffer)
53 cfg := &Cfg{
54 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
55 Ctx: ctx,
56 Cancel: cancel,
57 ArtifactDir: artifactDir,
58 EventSource: io.NopCloser(bytes.NewReader(append(eventJSON, '\n'))),
59 MonitorInterval: 200 * time.Millisecond,
60 NewWorkspace: defaultWorkspaceFactory,
61 StatusOutput: statusBuf,
62 IncludeRunning: true,
63 }
64
65 // 3. Run the runner (fire-and-forget, exits quickly)
66 runnerDone := make(chan error, 1)
67 go func() {
68 runnerDone <- RunRunner(cfg)
69 }()
70
71 select {
72 case err := <-runnerDone:
73 if err != nil {
74 t.Fatalf("runner: %v", err)
75 }
76 case <-time.After(30 * time.Second):
77 t.Fatal("timeout waiting for runner to complete")
78 }
79
80 // 4. Run the monitor and poll for incremental artifacts
81 monitorDone := make(chan error, 1)
82 go func() {
83 monitorDone <- runMonitor(cfg)
84 }()
85
86 // Track what we've seen during the run
87 seenIndexHTML := false
88 seenIndexTXT := false
89 sessionArtifactShorts := make(map[string]bool)
90 var finalPayload *StatusPayload
91
92 for {
93 select {
94 case err := <-monitorDone:
95 t.Fatalf("monitor exited unexpectedly: %v", err)
96 case <-time.After(30 * time.Second):
97 t.Fatal("timeout waiting for job to complete")
98 default:
99 }
100
101 // Read all statuses and artifacts seen so far
102 data := statusBuf.Bytes()
103 if len(data) > 0 {
104 lines := scanLines(data)
105 for _, line := range lines {
106 var p StatusPayload
107 if err := json.Unmarshal([]byte(line), &p); err != nil {
108 continue
109 }
110
111 // Ignore statuses from unrelated jobs (e.g., leftover sessions from previous tests)
112 if p.Name != "test-repo" {
113 continue
114 }
115
116 // Check index files exist (generated at every tick)
117 indexHTMLPath := filepath.Join(artifactDir, p.Name, p.JobID, "index.html")
118 indexTXTPath := filepath.Join(artifactDir, p.Name, p.JobID, "index.txt")
119 if _, err := os.Stat(indexHTMLPath); err == nil {
120 seenIndexHTML = true
121 t.Logf("index.html exists (status=%s)", p.Status)
122 }
123 if _, err := os.Stat(indexTXTPath); err == nil {
124 seenIndexTXT = true
125 }
126
127 // Track per-session artifacts
128 for _, s := range p.Sessions {
129 htmlPath := filepath.Join(artifactDir, p.Name, p.JobID, s.Short+".html")
130 txtPath := filepath.Join(artifactDir, p.Name, p.JobID, s.Short+".txt")
131 if _, err := os.Stat(htmlPath); err == nil {
132 sessionArtifactShorts[s.Short+"_html"] = true
133 }
134 if _, err := os.Stat(txtPath); err == nil {
135 sessionArtifactShorts[s.Short+"_txt"] = true
136 }
137 }
138
139 // Check if we've reached a final state
140 if p.Status == "success" || p.Status == "failed" {
141 cancel() // stop the monitor
142 finalPayload = &p
143 break
144 }
145 }
146 }
147
148 if finalPayload != nil {
149 break
150 }
151
152 // Assert progress: if we see session artifacts, we should see index files too
153 if len(sessionArtifactShorts) > 0 && (!seenIndexHTML || !seenIndexTXT) {
154 t.Error("index files should exist once any session artifact exists")
155 }
156
157 time.Sleep(cfg.MonitorInterval / 2) // poll at twice the interval rate
158 }
159
160 // Wait for monitor to exit gracefully
161 select {
162 case <-monitorDone:
163 case <-time.After(5 * time.Second):
164 t.Log("warning: monitor did not exit gracefully")
165 }
166
167 // 5. Assert we saw index files during the run
168 if !seenIndexHTML {
169 t.Error("never saw index.html during monitoring")
170 }
171 if !seenIndexTXT {
172 t.Error("never saw index.txt during monitoring")
173 }
174
175 // 6. Assert final payload has correct data
176 if finalPayload == nil {
177 t.Fatal("no final payload")
178 }
179 // Filter to only sessions with our job prefix to avoid picking up unrelated ci.* sessions
180 expectedPrefix := fmt.Sprintf("ci.%s.%s.", finalPayload.Name, finalPayload.JobID)
181 sessions := make([]SessionInfo, 0, len(finalPayload.Sessions))
182 for _, s := range finalPayload.Sessions {
183 if strings.HasPrefix(s.Name, expectedPrefix) {
184 sessions = append(sessions, s)
185 }
186 }
187 finalPayload.Sessions = sessions
188
189 if finalPayload.Name != "test-repo" {
190 t.Errorf("expected name test-repo, got %q", finalPayload.Name)
191 }
192 if finalPayload.Status != "success" {
193 t.Errorf("expected status success, got %q", finalPayload.Status)
194 }
195 if finalPayload.ExitCode == nil || *finalPayload.ExitCode != 0 {
196 t.Errorf("expected exit code 0, got %v", finalPayload.ExitCode)
197 }
198 if len(finalPayload.Sessions) < 2 {
199 t.Errorf("expected at least 2 sessions, got %d", len(finalPayload.Sessions))
200 }
201
202 // 7. Assert sessions have correct names
203 sessionNames := make(map[string]bool)
204 for _, s := range finalPayload.Sessions {
205 sessionNames[s.Short] = true
206 t.Logf("session: name=%s short=%s exit_code=%s ended=%s", s.Name, s.Short, s.ExitCode, s.Ended)
207 }
208 // Sessions from the test's pico.sh: step1 and step2
209 if !sessionNames["step1"] {
210 t.Error("expected session 'step1'")
211 }
212 if !sessionNames["step2"] {
213 t.Error("expected session 'step2'")
214 }
215
216 // 8. Assert HTML artifacts were staged for each session
217 for _, s := range finalPayload.Sessions {
218 artifactPath := filepath.Join(cfg.ArtifactDir, finalPayload.Name, finalPayload.JobID, s.Short+".html")
219 data, err := os.ReadFile(artifactPath)
220 if err != nil {
221 t.Errorf("read artifact %s: %v", artifactPath, err)
222 continue
223 }
224 if len(data) == 0 {
225 t.Errorf("artifact %s is empty", artifactPath)
226 }
227 if !bytes.Contains(data, []byte("<div")) {
228 t.Errorf("artifact %s does not contain HTML content", artifactPath)
229 }
230 }
231
232 // Cleanup any leftover zmx sessions for this job
233 if finalPayload != nil {
234 _ = exec.Command("zmx", "kill", "-f", fmt.Sprintf("ci.test-repo.%s", finalPayload.JobID)).Run()
235 }
236}
237
238func scanLines(data []byte) []string {
239 var lines []string
240 scanner := bufio.NewScanner(bytes.NewReader(data))
241 for scanner.Scan() {
242 lines = append(lines, scanner.Text())
243 }
244 return lines
245}
246
247// TestE2E_DuplicateCancellation verifies that starting a new job for the same
248// repo cancels any existing running job for that repo.
249func TestE2E_DuplicateCancellation(t *testing.T) {
250 if testing.Short() {
251 t.Skip("skip integration test")
252 }
253 if _, err := exec.LookPath("zmx"); err != nil {
254 t.Skip("zmx not found, skipping integration test")
255 }
256
257 // 1. Create workspace with a slow pico.sh so the first job stays running
258 workspaceDir := t.TempDir()
259 picoSh := `#!/usr/bin/env bash
260set -e
261zmx run slow-step sleep 30
262`
263 if err := os.WriteFile(filepath.Join(workspaceDir, "pico.sh"), []byte(picoSh), 0755); err != nil {
264 t.Fatalf("write pico.sh: %v", err)
265 }
266
267 // 2. Create a fast pico.sh for the second job
268 workspaceDir2 := t.TempDir()
269 picoSh2 := `#!/usr/bin/env bash
270set -e
271zmx run fast-step echo "done"
272`
273 if err := os.WriteFile(filepath.Join(workspaceDir2, "pico.sh"), []byte(picoSh2), 0755); err != nil {
274 t.Fatalf("write pico.sh: %v", err)
275 }
276
277 artifactDir := t.TempDir()
278 ctx, cancel := context.WithCancel(context.Background())
279 defer cancel()
280
281 makeCfg := func(eventSource io.ReadCloser) *Cfg {
282 return &Cfg{
283 Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
284 Ctx: ctx,
285 Cancel: cancel,
286 ArtifactDir: artifactDir,
287 EventSource: eventSource,
288 MonitorInterval: 200 * time.Millisecond,
289 NewWorkspace: defaultWorkspaceFactory,
290 }
291 }
292
293 // 3. Start first job (slow)
294 event1 := Event{Type: "build", Name: "dup-repo", Workspace: workspaceDir}
295 event1JSON, _ := json.Marshal(event1)
296 cfg1 := makeCfg(io.NopCloser(bytes.NewReader(append(event1JSON, '\n'))))
297
298 go func() {
299 _ = RunRunner(cfg1)
300 }()
301
302 // Wait for the first job's runner session to appear
303 if !waitForSessionPrefix(t, "ci.dup-repo.", 10*time.Second) {
304 t.Fatal("first job's runner session never appeared")
305 }
306
307 // Record the first job's runner session name
308 firstRunner := findRunnerSession(t, "dup-repo")
309 if firstRunner == "" {
310 t.Fatal("could not find first job's runner session")
311 }
312 t.Logf("first job runner: %s", firstRunner)
313
314 // 4. Start second job (same repo name, should cancel the first)
315 event2 := Event{Type: "build", Name: "dup-repo", Workspace: workspaceDir2}
316 event2JSON, _ := json.Marshal(event2)
317 cfg2 := makeCfg(io.NopCloser(bytes.NewReader(append(event2JSON, '\n'))))
318
319 runner2Done := make(chan error, 1)
320 go func() {
321 runner2Done <- RunRunner(cfg2)
322 }()
323
324 // Wait for second runner to complete
325 select {
326 case err := <-runner2Done:
327 if err != nil {
328 t.Fatalf("second runner: %v", err)
329 }
330 case <-time.After(30 * time.Second):
331 t.Fatal("timeout waiting for second runner")
332 }
333
334 // 5. Verify the first job's runner session was killed
335 // Give zmx kill time to propagate
336 time.Sleep(1 * time.Second)
337
338 listOutput, _ := exec.Command("zmx", "list").CombinedOutput()
339 sessions := parseZMXList(string(listOutput))
340 for _, s := range sessions {
341 if s.Name == firstRunner {
342 if s.Ended == "" {
343 t.Errorf("first job's runner session %s should have been killed (ended is empty)", firstRunner)
344 } else {
345 t.Logf("first job's runner session %s was killed (ended=%s)", firstRunner, s.Ended)
346 }
347 }
348 }
349
350 // Cleanup
351 _ = exec.Command("zmx", "kill", "-f", "ci.dup-repo").Run()
352}
353
354// waitForSessionPrefix returns true if a session with the given prefix appears within the timeout.
355func waitForSessionPrefix(t *testing.T, prefix string, timeout time.Duration) bool {
356 deadline := time.Now().Add(timeout)
357 for time.Now().Before(deadline) {
358 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
359 if err == nil {
360 sessions := parseZMXList(string(listOutput))
361 for _, s := range sessions {
362 if strings.HasPrefix(s.Name, prefix) {
363 return true
364 }
365 }
366 }
367 time.Sleep(200 * time.Millisecond)
368 }
369 return false
370}
371
372// findRunnerSession finds the runner session for a given repo name.
373func findRunnerSession(t *testing.T, name string) string {
374 listOutput, err := exec.Command("zmx", "list").CombinedOutput()
375 if err != nil {
376 t.Fatalf("zmx list: %v", err)
377 }
378 sessions := parseZMXList(string(listOutput))
379 for _, s := range sessions {
380 if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") {
381 return s.Name
382 }
383 }
384 return ""
385}
386
387func TestGenerateJobID(t *testing.T) {
388 // Same inputs should produce same hash
389 id1 := jobIDFor("myrepo", "/workspace", 1000)
390 id2 := jobIDFor("myrepo", "/workspace", 1000)
391 if id1 != id2 {
392 t.Errorf("expected same ID for same inputs, got %q and %q", id1, id2)
393 }
394
395 // Different name should produce different hash
396 id3 := jobIDFor("otherrepo", "/workspace", 1000)
397 if id1 == id3 {
398 t.Errorf("expected different IDs for different names, got %q", id1)
399 }
400
401 // Different timestamp should produce different hash
402 id4 := jobIDFor("myrepo", "/workspace", 2000)
403 if id1 == id4 {
404 t.Errorf("expected different IDs for different timestamps, got %q", id1)
405 }
406
407 // ID should be 8 hex chars
408 if len(id1) != 8 {
409 t.Errorf("expected 8 char ID, got %d chars: %q", len(id1), id1)
410 }
411
412 // generateJobID (with real time) should also produce valid IDs
413 id := generateJobID("myrepo", "/workspace")
414 if len(id) != 8 {
415 t.Errorf("generateJobID expected 8 char ID, got %d chars: %q", len(id), id)
416 }
417}
418
419func TestAllCompleted(t *testing.T) {
420 tests := []struct {
421 name string
422 sessions []SessionInfo
423 want bool
424 }{
425 {
426 name: "empty sessions",
427 sessions: []SessionInfo{},
428 want: true,
429 },
430 {
431 name: "all completed",
432 sessions: []SessionInfo{
433 {Name: "a", Ended: "123"},
434 {Name: "b", Ended: "456"},
435 },
436 want: true,
437 },
438 {
439 name: "one not completed",
440 sessions: []SessionInfo{
441 {Name: "a", Ended: "123"},
442 {Name: "b", Ended: ""},
443 },
444 want: false,
445 },
446 }
447
448 for _, tt := range tests {
449 t.Run(tt.name, func(t *testing.T) {
450 got := allCompleted(tt.sessions)
451 if got != tt.want {
452 t.Errorf("allCompleted() = %v, want %v", got, tt.want)
453 }
454 })
455 }
456}
457
458func TestParseZMXList(t *testing.T) {
459 output := `name=ci-lint pid=1064464 clients=0 created=1777519944 start_dir=/home/erock/dev/pico ended=1777519986 exit_code=0
460 name=ci-tests pid=1064472 clients=0 created=1777519944 start_dir=/home/erock/dev/pico ended=1777519958 exit_code=2
461→ name=d.build.1 pid=549652 clients=0 created=1777513430 start_dir=/home/erock`
462
463 sessions := parseZMXList(output)
464 if len(sessions) != 3 {
465 t.Fatalf("expected 3 sessions, got %d", len(sessions))
466 }
467
468 if sessions[0].Name != "ci-lint" {
469 t.Errorf("expected first session name ci-lint, got %q", sessions[0].Name)
470 }
471 if sessions[0].Ended != "1777519986" {
472 t.Errorf("expected ended 1777519986, got %q", sessions[0].Ended)
473 }
474 if sessions[0].ExitCode != "0" {
475 t.Errorf("expected exit_code 0, got %q", sessions[0].ExitCode)
476 }
477
478 if sessions[2].Name != "d.build.1" {
479 t.Errorf("expected third session name d.build.1, got %q", sessions[2].Name)
480 }
481 if sessions[2].Ended != "" {
482 t.Errorf("expected empty ended for active session, got %q", sessions[2].Ended)
483 }
484}
485
486func TestExtractJobID(t *testing.T) {
487 tests := []struct {
488 runnerName string
489 want string
490 }{
491 {"ci.myrepo.abc123.runner", "abc123"},
492 {"ci.test-repo.006d0847.runner", "006d0847"},
493 {"ci.my_org.project.abc123.runner", "project.abc123"}, // name with underscore
494 }
495
496 for _, tt := range tests {
497 got := extractJobID(tt.runnerName)
498 if got != tt.want {
499 t.Errorf("extractJobID(%q) = %q, want %q", tt.runnerName, got, tt.want)
500 }
501 }
502}
503
504func TestExtractJobPrefix(t *testing.T) {
505 tests := []struct {
506 sessionName string
507 want string
508 }{
509 {"ci.myrepo.abc123.lint", "ci.myrepo.abc123."},
510 {"ci.myrepo.abc123.runner", "ci.myrepo.abc123."},
511 {"ci.myrepo.abc123.tests", "ci.myrepo.abc123."},
512 {"ci.name.jobID.step.substep", "ci.name.jobID."},
513 {"ci.a.b", ""}, // too few parts
514 }
515
516 for _, tt := range tests {
517 got := extractJobPrefix(tt.sessionName)
518 if got != tt.want {
519 t.Errorf("extractJobPrefix(%q) = %q, want %q", tt.sessionName, got, tt.want)
520 }
521 }
522}
523
524func TestFindRunningJobs(t *testing.T) {
525 output := `name=ci.myrepo.abc123.runner pid=100 clients=0 created=1777519944 start_dir=/home/erock
526 name=ci.myrepo.abc123.lint pid=101 clients=0 created=1777519944 start_dir=/home/erock
527 name=ci.myrepo.abc123.tests pid=102 clients=0 created=1777519944 start_dir=/home/erock ended=1777519986 exit_code=0
528 name=ci.myrepo.def456.runner pid=103 clients=0 created=1777519944 start_dir=/home/erock ended=1777519986 exit_code=0
529 name=ci.other.abc123.runner pid=104 clients=0 created=1777519944 start_dir=/home/erock
530→ name=d.build.1 pid=549652 clients=0 created=1777513430 start_dir=/home/erock`
531
532 runners, sessions := findRunningJobsFromOutput(output, "myrepo")
533 if len(runners) != 1 {
534 t.Fatalf("expected 1 running job, got %d: %v", len(runners), runners)
535 }
536 if runners[0] != "ci.myrepo.abc123.runner" {
537 t.Errorf("expected runner ci.myrepo.abc123.runner, got %q", runners[0])
538 }
539 if len(sessions) != 6 {
540 t.Errorf("expected 6 total sessions, got %d", len(sessions))
541 }
542}
543
544func findRunningJobsFromOutput(output, name string) ([]string, []SessionInfo) {
545 sessions := parseZMXList(output)
546 var runners []string
547 for _, s := range sessions {
548 if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") && s.Ended == "" {
549 runners = append(runners, s.Name)
550 }
551 }
552 return runners, sessions
553}
554
555func TestKillSessionsEmpty(t *testing.T) {
556 // Should not error with empty list
557 if err := killSessions(nil); err != nil {
558 t.Errorf("killSessions(nil) = %v, want nil", err)
559 }
560 if err := killSessions([]string{}); err != nil {
561 t.Errorf("killSessions([]) = %v, want nil", err)
562 }
563}
564
565func TestResolveJobExitCode(t *testing.T) {
566 tests := []struct {
567 name string
568 sessions []SessionInfo
569 wantCode int
570 wantStatus string
571 }{
572 {
573 name: "all success",
574 sessions: []SessionInfo{
575 {Name: "ci.repo.abc.runner", ExitCode: "0", Ended: "1"},
576 {Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
577 },
578 wantCode: 0,
579 wantStatus: "success",
580 },
581 {
582 name: "runner failed",
583 sessions: []SessionInfo{
584 {Name: "ci.repo.abc.runner", ExitCode: "1", Ended: "1"},
585 {Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
586 },
587 wantCode: 1,
588 wantStatus: "failed",
589 },
590 {
591 name: "child failed, runner says 0 (defensive)",
592 sessions: []SessionInfo{
593 {Name: "ci.repo.abc.runner", ExitCode: "0", Ended: "1"},
594 {Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
595 {Name: "ci.repo.abc.step2", ExitCode: "2", Ended: "1"},
596 },
597 wantCode: 2,
598 wantStatus: "failed",
599 },
600 {
601 name: "worst child exit code wins",
602 sessions: []SessionInfo{
603 {Name: "ci.repo.abc.runner", ExitCode: "0", Ended: "1"},
604 {Name: "ci.repo.abc.step1", ExitCode: "1", Ended: "1"},
605 {Name: "ci.repo.abc.step2", ExitCode: "3", Ended: "1"},
606 },
607 wantCode: 3,
608 wantStatus: "failed",
609 },
610 {
611 name: "no runner session",
612 sessions: []SessionInfo{
613 {Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
614 },
615 wantCode: 0,
616 wantStatus: "success",
617 },
618 {
619 name: "sessions not yet ended (no exit code)",
620 sessions: []SessionInfo{
621 {Name: "ci.repo.abc.runner", ExitCode: "", Ended: ""},
622 {Name: "ci.repo.abc.step1", ExitCode: "", Ended: ""},
623 },
624 wantCode: 0,
625 wantStatus: "success",
626 },
627 }
628
629 for _, tt := range tests {
630 t.Run(tt.name, func(t *testing.T) {
631 code, status := resolveJobExitCode(tt.sessions)
632 if code != tt.wantCode {
633 t.Errorf("exit code = %d, want %d", code, tt.wantCode)
634 }
635 if status != tt.wantStatus {
636 t.Errorf("status = %q, want %q", status, tt.wantStatus)
637 }
638 })
639 }
640}