main main_test.go
Eric Bower  ·  2026-05-08
  1package main
  2
  3import (
  4	"bufio"
  5	"bytes"
  6	"context"
  7	"encoding/json"
  8	"fmt"
  9	"io"
 10	"log/slog"
 11	"os"
 12	"os/exec"
 13	"path/filepath"
 14	"strings"
 15	"testing"
 16	"time"
 17)
 18
 19// TestE2E_RunnerWithZMXSessions is a full integration test that:
 20// 1. Creates a workspace with pico.sh that spawns zmx sessions
 21// 2. Feeds an event to RunRunner (fire-and-forget)
 22// 3. Runs the monitor to track job completion
 23// 4. Reads the status file and asserts correct status transitions.
 24func TestE2E_RunnerWithZMXSessions(t *testing.T) {
 25	if testing.Short() {
 26		t.Skip("skip integration test")
 27	}
 28	if _, err := exec.LookPath("zmx"); err != nil {
 29		t.Skip("zmx not found, skipping integration test")
 30	}
 31
 32	// 1. Create workspace with pico.sh that spawns zmx sessions
 33	workspaceDir := t.TempDir()
 34	picoSh := `#!/usr/bin/env bash
 35set -e
 36zmx run step1 echo "hello from step1"
 37zmx run step2 echo "hello from step2"
 38`
 39	if err := os.WriteFile(filepath.Join(workspaceDir, "pico.sh"), []byte(picoSh), 0755); err != nil {
 40		t.Fatalf("write pico.sh: %v", err)
 41	}
 42
 43	// 2. Create config
 44	artifactDir := t.TempDir()
 45	ctx, cancel := context.WithCancel(context.Background())
 46	event := Event{
 47		Type:      "build",
 48		Name:      "test-repo",
 49		Workspace: workspaceDir,
 50	}
 51	eventJSON, _ := json.Marshal(event)
 52	statusBuf := new(bytes.Buffer)
 53	cfg := &Cfg{
 54		Logger:          slog.New(slog.NewTextHandler(io.Discard, nil)),
 55		Ctx:             ctx,
 56		Cancel:          cancel,
 57		ArtifactDir:     artifactDir,
 58		EventSource:     io.NopCloser(bytes.NewReader(append(eventJSON, '\n'))),
 59		MonitorInterval: 200 * time.Millisecond,
 60		NewWorkspace:    defaultWorkspaceFactory,
 61		StatusOutput:    statusBuf,
 62		IncludeRunning:  true,
 63	}
 64
 65	// 3. Run the runner (fire-and-forget, exits quickly)
 66	runnerDone := make(chan error, 1)
 67	go func() {
 68		runnerDone <- RunRunner(cfg)
 69	}()
 70
 71	select {
 72	case err := <-runnerDone:
 73		if err != nil {
 74			t.Fatalf("runner: %v", err)
 75		}
 76	case <-time.After(30 * time.Second):
 77		t.Fatal("timeout waiting for runner to complete")
 78	}
 79
 80	// 4. Run the monitor and poll for incremental artifacts
 81	monitorDone := make(chan error, 1)
 82	go func() {
 83		monitorDone <- runMonitor(cfg)
 84	}()
 85
 86	// Track what we've seen during the run
 87	seenIndexHTML := false
 88	seenIndexTXT := false
 89	sessionArtifactShorts := make(map[string]bool)
 90	var finalPayload *StatusPayload
 91
 92	for {
 93		select {
 94		case err := <-monitorDone:
 95			t.Fatalf("monitor exited unexpectedly: %v", err)
 96		case <-time.After(30 * time.Second):
 97			t.Fatal("timeout waiting for job to complete")
 98		default:
 99		}
100
101		// Read all statuses and artifacts seen so far
102		data := statusBuf.Bytes()
103		if len(data) > 0 {
104			lines := scanLines(data)
105			for _, line := range lines {
106				var p StatusPayload
107				if err := json.Unmarshal([]byte(line), &p); err != nil {
108					continue
109				}
110
111				// Ignore statuses from unrelated jobs (e.g., leftover sessions from previous tests)
112				if p.Name != "test-repo" {
113					continue
114				}
115
116				// Check index files exist (generated at every tick)
117				indexHTMLPath := filepath.Join(artifactDir, p.Name, p.JobID, "index.html")
118				indexTXTPath := filepath.Join(artifactDir, p.Name, p.JobID, "index.txt")
119				if _, err := os.Stat(indexHTMLPath); err == nil {
120					seenIndexHTML = true
121					t.Logf("index.html exists (status=%s)", p.Status)
122				}
123				if _, err := os.Stat(indexTXTPath); err == nil {
124					seenIndexTXT = true
125				}
126
127				// Track per-session artifacts
128				for _, s := range p.Sessions {
129					htmlPath := filepath.Join(artifactDir, p.Name, p.JobID, s.Short+".html")
130					txtPath := filepath.Join(artifactDir, p.Name, p.JobID, s.Short+".txt")
131					if _, err := os.Stat(htmlPath); err == nil {
132						sessionArtifactShorts[s.Short+"_html"] = true
133					}
134					if _, err := os.Stat(txtPath); err == nil {
135						sessionArtifactShorts[s.Short+"_txt"] = true
136					}
137				}
138
139				// Check if we've reached a final state
140				if p.Status == "success" || p.Status == "failed" {
141					cancel() // stop the monitor
142					finalPayload = &p
143					break
144				}
145			}
146		}
147
148		if finalPayload != nil {
149			break
150		}
151
152		// Assert progress: if we see session artifacts, we should see index files too
153		if len(sessionArtifactShorts) > 0 && (!seenIndexHTML || !seenIndexTXT) {
154			t.Error("index files should exist once any session artifact exists")
155		}
156
157		time.Sleep(cfg.MonitorInterval / 2) // poll at twice the interval rate
158	}
159
160	// Wait for monitor to exit gracefully
161	select {
162	case <-monitorDone:
163	case <-time.After(5 * time.Second):
164		t.Log("warning: monitor did not exit gracefully")
165	}
166
167	// 5. Assert we saw index files during the run
168	if !seenIndexHTML {
169		t.Error("never saw index.html during monitoring")
170	}
171	if !seenIndexTXT {
172		t.Error("never saw index.txt during monitoring")
173	}
174
175	// 6. Assert final payload has correct data
176	if finalPayload == nil {
177		t.Fatal("no final payload")
178	}
179	// Filter to only sessions with our job prefix to avoid picking up unrelated ci.* sessions
180	expectedPrefix := fmt.Sprintf("ci.%s.%s.", finalPayload.Name, finalPayload.JobID)
181	sessions := make([]SessionInfo, 0, len(finalPayload.Sessions))
182	for _, s := range finalPayload.Sessions {
183		if strings.HasPrefix(s.Name, expectedPrefix) {
184			sessions = append(sessions, s)
185		}
186	}
187	finalPayload.Sessions = sessions
188
189	if finalPayload.Name != "test-repo" {
190		t.Errorf("expected name test-repo, got %q", finalPayload.Name)
191	}
192	if finalPayload.Status != "success" {
193		t.Errorf("expected status success, got %q", finalPayload.Status)
194	}
195	if finalPayload.ExitCode == nil || *finalPayload.ExitCode != 0 {
196		t.Errorf("expected exit code 0, got %v", finalPayload.ExitCode)
197	}
198	if len(finalPayload.Sessions) < 2 {
199		t.Errorf("expected at least 2 sessions, got %d", len(finalPayload.Sessions))
200	}
201
202	// 7. Assert sessions have correct names
203	sessionNames := make(map[string]bool)
204	for _, s := range finalPayload.Sessions {
205		sessionNames[s.Short] = true
206		t.Logf("session: name=%s short=%s exit_code=%s ended=%s", s.Name, s.Short, s.ExitCode, s.Ended)
207	}
208	// Sessions from the test's pico.sh: step1 and step2
209	if !sessionNames["step1"] {
210		t.Error("expected session 'step1'")
211	}
212	if !sessionNames["step2"] {
213		t.Error("expected session 'step2'")
214	}
215
216	// 8. Assert HTML artifacts were staged for each session
217	for _, s := range finalPayload.Sessions {
218		artifactPath := filepath.Join(cfg.ArtifactDir, finalPayload.Name, finalPayload.JobID, s.Short+".html")
219		data, err := os.ReadFile(artifactPath)
220		if err != nil {
221			t.Errorf("read artifact %s: %v", artifactPath, err)
222			continue
223		}
224		if len(data) == 0 {
225			t.Errorf("artifact %s is empty", artifactPath)
226		}
227		if !bytes.Contains(data, []byte("<div")) {
228			t.Errorf("artifact %s does not contain HTML content", artifactPath)
229		}
230	}
231
232	// Cleanup any leftover zmx sessions for this job
233	if finalPayload != nil {
234		_ = exec.Command("zmx", "kill", "-f", fmt.Sprintf("ci.test-repo.%s", finalPayload.JobID)).Run()
235	}
236}
237
238func scanLines(data []byte) []string {
239	var lines []string
240	scanner := bufio.NewScanner(bytes.NewReader(data))
241	for scanner.Scan() {
242		lines = append(lines, scanner.Text())
243	}
244	return lines
245}
246
247// TestE2E_DuplicateCancellation verifies that starting a new job for the same
248// repo cancels any existing running job for that repo.
249func TestE2E_DuplicateCancellation(t *testing.T) {
250	if testing.Short() {
251		t.Skip("skip integration test")
252	}
253	if _, err := exec.LookPath("zmx"); err != nil {
254		t.Skip("zmx not found, skipping integration test")
255	}
256
257	// 1. Create workspace with a slow pico.sh so the first job stays running
258	workspaceDir := t.TempDir()
259	picoSh := `#!/usr/bin/env bash
260set -e
261zmx run slow-step sleep 30
262`
263	if err := os.WriteFile(filepath.Join(workspaceDir, "pico.sh"), []byte(picoSh), 0755); err != nil {
264		t.Fatalf("write pico.sh: %v", err)
265	}
266
267	// 2. Create a fast pico.sh for the second job
268	workspaceDir2 := t.TempDir()
269	picoSh2 := `#!/usr/bin/env bash
270set -e
271zmx run fast-step echo "done"
272`
273	if err := os.WriteFile(filepath.Join(workspaceDir2, "pico.sh"), []byte(picoSh2), 0755); err != nil {
274		t.Fatalf("write pico.sh: %v", err)
275	}
276
277	artifactDir := t.TempDir()
278	ctx, cancel := context.WithCancel(context.Background())
279	defer cancel()
280
281	makeCfg := func(eventSource io.ReadCloser) *Cfg {
282		return &Cfg{
283			Logger:          slog.New(slog.NewTextHandler(io.Discard, nil)),
284			Ctx:             ctx,
285			Cancel:          cancel,
286			ArtifactDir:     artifactDir,
287			EventSource:     eventSource,
288			MonitorInterval: 200 * time.Millisecond,
289			NewWorkspace:    defaultWorkspaceFactory,
290		}
291	}
292
293	// 3. Start first job (slow)
294	event1 := Event{Type: "build", Name: "dup-repo", Workspace: workspaceDir}
295	event1JSON, _ := json.Marshal(event1)
296	cfg1 := makeCfg(io.NopCloser(bytes.NewReader(append(event1JSON, '\n'))))
297
298	go func() {
299		_ = RunRunner(cfg1)
300	}()
301
302	// Wait for the first job's runner session to appear
303	if !waitForSessionPrefix(t, "ci.dup-repo.", 10*time.Second) {
304		t.Fatal("first job's runner session never appeared")
305	}
306
307	// Record the first job's runner session name
308	firstRunner := findRunnerSession(t, "dup-repo")
309	if firstRunner == "" {
310		t.Fatal("could not find first job's runner session")
311	}
312	t.Logf("first job runner: %s", firstRunner)
313
314	// 4. Start second job (same repo name, should cancel the first)
315	event2 := Event{Type: "build", Name: "dup-repo", Workspace: workspaceDir2}
316	event2JSON, _ := json.Marshal(event2)
317	cfg2 := makeCfg(io.NopCloser(bytes.NewReader(append(event2JSON, '\n'))))
318
319	runner2Done := make(chan error, 1)
320	go func() {
321		runner2Done <- RunRunner(cfg2)
322	}()
323
324	// Wait for second runner to complete
325	select {
326	case err := <-runner2Done:
327		if err != nil {
328			t.Fatalf("second runner: %v", err)
329		}
330	case <-time.After(30 * time.Second):
331		t.Fatal("timeout waiting for second runner")
332	}
333
334	// 5. Verify the first job's runner session was killed
335	// Give zmx kill time to propagate
336	time.Sleep(1 * time.Second)
337
338	listOutput, _ := exec.Command("zmx", "list").CombinedOutput()
339	sessions := parseZMXList(string(listOutput))
340	for _, s := range sessions {
341		if s.Name == firstRunner {
342			if s.Ended == "" {
343				t.Errorf("first job's runner session %s should have been killed (ended is empty)", firstRunner)
344			} else {
345				t.Logf("first job's runner session %s was killed (ended=%s)", firstRunner, s.Ended)
346			}
347		}
348	}
349
350	// Cleanup
351	_ = exec.Command("zmx", "kill", "-f", "ci.dup-repo").Run()
352}
353
354// waitForSessionPrefix returns true if a session with the given prefix appears within the timeout.
355func waitForSessionPrefix(t *testing.T, prefix string, timeout time.Duration) bool {
356	deadline := time.Now().Add(timeout)
357	for time.Now().Before(deadline) {
358		listOutput, err := exec.Command("zmx", "list").CombinedOutput()
359		if err == nil {
360			sessions := parseZMXList(string(listOutput))
361			for _, s := range sessions {
362				if strings.HasPrefix(s.Name, prefix) {
363					return true
364				}
365			}
366		}
367		time.Sleep(200 * time.Millisecond)
368	}
369	return false
370}
371
372// findRunnerSession finds the runner session for a given repo name.
373func findRunnerSession(t *testing.T, name string) string {
374	listOutput, err := exec.Command("zmx", "list").CombinedOutput()
375	if err != nil {
376		t.Fatalf("zmx list: %v", err)
377	}
378	sessions := parseZMXList(string(listOutput))
379	for _, s := range sessions {
380		if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") {
381			return s.Name
382		}
383	}
384	return ""
385}
386
387func TestGenerateJobID(t *testing.T) {
388	// Same inputs should produce same hash
389	id1 := jobIDFor("myrepo", "/workspace", 1000)
390	id2 := jobIDFor("myrepo", "/workspace", 1000)
391	if id1 != id2 {
392		t.Errorf("expected same ID for same inputs, got %q and %q", id1, id2)
393	}
394
395	// Different name should produce different hash
396	id3 := jobIDFor("otherrepo", "/workspace", 1000)
397	if id1 == id3 {
398		t.Errorf("expected different IDs for different names, got %q", id1)
399	}
400
401	// Different timestamp should produce different hash
402	id4 := jobIDFor("myrepo", "/workspace", 2000)
403	if id1 == id4 {
404		t.Errorf("expected different IDs for different timestamps, got %q", id1)
405	}
406
407	// ID should be 8 hex chars
408	if len(id1) != 8 {
409		t.Errorf("expected 8 char ID, got %d chars: %q", len(id1), id1)
410	}
411
412	// generateJobID (with real time) should also produce valid IDs
413	id := generateJobID("myrepo", "/workspace")
414	if len(id) != 8 {
415		t.Errorf("generateJobID expected 8 char ID, got %d chars: %q", len(id), id)
416	}
417}
418
419func TestAllCompleted(t *testing.T) {
420	tests := []struct {
421		name     string
422		sessions []SessionInfo
423		want     bool
424	}{
425		{
426			name:     "empty sessions",
427			sessions: []SessionInfo{},
428			want:     true,
429		},
430		{
431			name: "all completed",
432			sessions: []SessionInfo{
433				{Name: "a", Ended: "123"},
434				{Name: "b", Ended: "456"},
435			},
436			want: true,
437		},
438		{
439			name: "one not completed",
440			sessions: []SessionInfo{
441				{Name: "a", Ended: "123"},
442				{Name: "b", Ended: ""},
443			},
444			want: false,
445		},
446	}
447
448	for _, tt := range tests {
449		t.Run(tt.name, func(t *testing.T) {
450			got := allCompleted(tt.sessions)
451			if got != tt.want {
452				t.Errorf("allCompleted() = %v, want %v", got, tt.want)
453			}
454		})
455	}
456}
457
458func TestParseZMXList(t *testing.T) {
459	output := `name=ci-lint	pid=1064464	clients=0	created=1777519944	start_dir=/home/erock/dev/pico	ended=1777519986	exit_code=0
460  name=ci-tests	pid=1064472	clients=0	created=1777519944	start_dir=/home/erock/dev/pico	ended=1777519958	exit_code=2
461→ name=d.build.1	pid=549652	clients=0	created=1777513430	start_dir=/home/erock`
462
463	sessions := parseZMXList(output)
464	if len(sessions) != 3 {
465		t.Fatalf("expected 3 sessions, got %d", len(sessions))
466	}
467
468	if sessions[0].Name != "ci-lint" {
469		t.Errorf("expected first session name ci-lint, got %q", sessions[0].Name)
470	}
471	if sessions[0].Ended != "1777519986" {
472		t.Errorf("expected ended 1777519986, got %q", sessions[0].Ended)
473	}
474	if sessions[0].ExitCode != "0" {
475		t.Errorf("expected exit_code 0, got %q", sessions[0].ExitCode)
476	}
477
478	if sessions[2].Name != "d.build.1" {
479		t.Errorf("expected third session name d.build.1, got %q", sessions[2].Name)
480	}
481	if sessions[2].Ended != "" {
482		t.Errorf("expected empty ended for active session, got %q", sessions[2].Ended)
483	}
484}
485
486func TestExtractJobID(t *testing.T) {
487	tests := []struct {
488		runnerName string
489		want       string
490	}{
491		{"ci.myrepo.abc123.runner", "abc123"},
492		{"ci.test-repo.006d0847.runner", "006d0847"},
493		{"ci.my_org.project.abc123.runner", "project.abc123"}, // name with underscore
494	}
495
496	for _, tt := range tests {
497		got := extractJobID(tt.runnerName)
498		if got != tt.want {
499			t.Errorf("extractJobID(%q) = %q, want %q", tt.runnerName, got, tt.want)
500		}
501	}
502}
503
504func TestExtractJobPrefix(t *testing.T) {
505	tests := []struct {
506		sessionName string
507		want        string
508	}{
509		{"ci.myrepo.abc123.lint", "ci.myrepo.abc123."},
510		{"ci.myrepo.abc123.runner", "ci.myrepo.abc123."},
511		{"ci.myrepo.abc123.tests", "ci.myrepo.abc123."},
512		{"ci.name.jobID.step.substep", "ci.name.jobID."},
513		{"ci.a.b", ""}, // too few parts
514	}
515
516	for _, tt := range tests {
517		got := extractJobPrefix(tt.sessionName)
518		if got != tt.want {
519			t.Errorf("extractJobPrefix(%q) = %q, want %q", tt.sessionName, got, tt.want)
520		}
521	}
522}
523
524func TestFindRunningJobs(t *testing.T) {
525	output := `name=ci.myrepo.abc123.runner	pid=100	clients=0	created=1777519944	start_dir=/home/erock
526  name=ci.myrepo.abc123.lint	pid=101	clients=0	created=1777519944	start_dir=/home/erock
527  name=ci.myrepo.abc123.tests	pid=102	clients=0	created=1777519944	start_dir=/home/erock	ended=1777519986	exit_code=0
528  name=ci.myrepo.def456.runner	pid=103	clients=0	created=1777519944	start_dir=/home/erock	ended=1777519986	exit_code=0
529  name=ci.other.abc123.runner	pid=104	clients=0	created=1777519944	start_dir=/home/erock
530→ name=d.build.1	pid=549652	clients=0	created=1777513430	start_dir=/home/erock`
531
532	runners, sessions := findRunningJobsFromOutput(output, "myrepo")
533	if len(runners) != 1 {
534		t.Fatalf("expected 1 running job, got %d: %v", len(runners), runners)
535	}
536	if runners[0] != "ci.myrepo.abc123.runner" {
537		t.Errorf("expected runner ci.myrepo.abc123.runner, got %q", runners[0])
538	}
539	if len(sessions) != 6 {
540		t.Errorf("expected 6 total sessions, got %d", len(sessions))
541	}
542}
543
544func findRunningJobsFromOutput(output, name string) ([]string, []SessionInfo) {
545	sessions := parseZMXList(output)
546	var runners []string
547	for _, s := range sessions {
548		if strings.HasPrefix(s.Name, "ci."+name+".") && strings.HasSuffix(s.Name, ".runner") && s.Ended == "" {
549			runners = append(runners, s.Name)
550		}
551	}
552	return runners, sessions
553}
554
555func TestKillSessionsEmpty(t *testing.T) {
556	// Should not error with empty list
557	if err := killSessions(nil); err != nil {
558		t.Errorf("killSessions(nil) = %v, want nil", err)
559	}
560	if err := killSessions([]string{}); err != nil {
561		t.Errorf("killSessions([]) = %v, want nil", err)
562	}
563}
564
565func TestResolveJobExitCode(t *testing.T) {
566	tests := []struct {
567		name       string
568		sessions   []SessionInfo
569		wantCode   int
570		wantStatus string
571	}{
572		{
573			name: "all success",
574			sessions: []SessionInfo{
575				{Name: "ci.repo.abc.runner", ExitCode: "0", Ended: "1"},
576				{Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
577			},
578			wantCode:   0,
579			wantStatus: "success",
580		},
581		{
582			name: "runner failed",
583			sessions: []SessionInfo{
584				{Name: "ci.repo.abc.runner", ExitCode: "1", Ended: "1"},
585				{Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
586			},
587			wantCode:   1,
588			wantStatus: "failed",
589		},
590		{
591			name: "child failed, runner says 0 (defensive)",
592			sessions: []SessionInfo{
593				{Name: "ci.repo.abc.runner", ExitCode: "0", Ended: "1"},
594				{Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
595				{Name: "ci.repo.abc.step2", ExitCode: "2", Ended: "1"},
596			},
597			wantCode:   2,
598			wantStatus: "failed",
599		},
600		{
601			name: "worst child exit code wins",
602			sessions: []SessionInfo{
603				{Name: "ci.repo.abc.runner", ExitCode: "0", Ended: "1"},
604				{Name: "ci.repo.abc.step1", ExitCode: "1", Ended: "1"},
605				{Name: "ci.repo.abc.step2", ExitCode: "3", Ended: "1"},
606			},
607			wantCode:   3,
608			wantStatus: "failed",
609		},
610		{
611			name: "no runner session",
612			sessions: []SessionInfo{
613				{Name: "ci.repo.abc.step1", ExitCode: "0", Ended: "1"},
614			},
615			wantCode:   0,
616			wantStatus: "success",
617		},
618		{
619			name: "sessions not yet ended (no exit code)",
620			sessions: []SessionInfo{
621				{Name: "ci.repo.abc.runner", ExitCode: "", Ended: ""},
622				{Name: "ci.repo.abc.step1", ExitCode: "", Ended: ""},
623			},
624			wantCode:   0,
625			wantStatus: "success",
626		},
627	}
628
629	for _, tt := range tests {
630		t.Run(tt.name, func(t *testing.T) {
631			code, status := resolveJobExitCode(tt.sessions)
632			if code != tt.wantCode {
633				t.Errorf("exit code = %d, want %d", code, tt.wantCode)
634			}
635			if status != tt.wantStatus {
636				t.Errorf("status = %q, want %q", status, tt.wantStatus)
637			}
638		})
639	}
640}