fix client impl

This commit is contained in:
2026-01-08 22:29:51 +08:00
parent 07c05b8e9f
commit 6c8d86202c
5 changed files with 750 additions and 19 deletions

View File

@@ -39,6 +39,7 @@ import (
giteav1alpha1 "github.com/bapung/gitea-runner-operator/api/v1alpha1"
"github.com/bapung/gitea-runner-operator/internal/controller"
"github.com/bapung/gitea-runner-operator/internal/gitea"
// +kubebuilder:scaffold:imports
)
@@ -203,8 +204,9 @@ func main() {
}
if err := (&controller.RunnerGroupReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
GiteaClient: gitea.NewHTTPClient(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RunnerGroup")
os.Exit(1)

View File

@@ -4,6 +4,26 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
- watch
- apiGroups:
- batch
resources:
- jobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- gitea.bpg.pw
resources:

4
go.mod
View File

@@ -5,8 +5,10 @@ go 1.24.0
require (
github.com/onsi/ginkgo/v2 v2.22.0
github.com/onsi/gomega v1.36.1
k8s.io/api v0.33.0
k8s.io/apimachinery v0.33.0
k8s.io/client-go v0.33.0
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
sigs.k8s.io/controller-runtime v0.21.0
)
@@ -82,13 +84,11 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.33.0 // indirect
k8s.io/apiextensions-apiserver v0.33.0 // indirect
k8s.io/apiserver v0.33.0 // indirect
k8s.io/component-base v0.33.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect

View File

@@ -18,6 +18,13 @@ package gitea
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/bapung/gitea-runner-operator/api/v1alpha1"
)
@@ -39,16 +46,68 @@ type Client interface {
// HTTPClient is the default implementation of the Gitea Client interface
type HTTPClient struct {
// TODO: Add HTTP client and any necessary configuration
httpClient *http.Client
}
// NewHTTPClient creates a new Gitea HTTP client
func NewHTTPClient() *HTTPClient {
return &HTTPClient{}
return &HTTPClient{
httpClient: &http.Client{
Timeout: 30 * time.Second,
},
}
}
// Repository represents a Gitea repository
type Repository struct {
Owner struct {
Login string `json:"login"`
} `json:"owner"`
Name string `json:"name"`
FullName string `json:"full_name"`
}
// Organization represents a Gitea organization
type Organization struct {
Username string `json:"username"`
Name string `json:"name"`
}
// ActionWorkflowRunsResponse represents the response structure for workflow runs
type ActionWorkflowRunsResponse struct {
TotalCount int64 `json:"total_count"`
WorkflowRuns []ActionWorkflowRun `json:"workflow_runs"`
}
// ActionWorkflowRun represents a Gitea workflow run
type ActionWorkflowRun struct {
ID int64 `json:"id"`
Status string `json:"status"`
DisplayTitle string `json:"display_title"`
Event string `json:"event"`
HeadBranch string `json:"head_branch"`
HeadSha string `json:"head_sha"`
RunNumber int64 `json:"run_number"`
}
// ActionWorkflowJobsResponse represents the response structure for workflow jobs
type ActionWorkflowJobsResponse struct {
TotalCount int64 `json:"total_count"`
Jobs []ActionWorkflowJob `json:"jobs"`
}
// ActionWorkflowJob represents a Gitea workflow job with runner labels
type ActionWorkflowJob struct {
ID int64 `json:"id"`
Status string `json:"status"`
Name string `json:"name"`
Labels []string `json:"labels"`
RunID int64 `json:"run_id"`
RunnerID int64 `json:"runner_id"`
RunnerName string `json:"runner_name"`
}
// GetQueuedRuns implements the Client interface
// This is a placeholder implementation that will be fully implemented in step 5
func (c *HTTPClient) GetQueuedRuns(
ctx context.Context,
giteaURL string,
@@ -58,16 +117,353 @@ func (c *HTTPClient) GetQueuedRuns(
repo string,
labels []string,
) (int, error) {
// TODO: Implement actual Gitea API calls
// This is a placeholder that returns 0 queued jobs
// Based on scope:
// - global: Recursively fetch all orgs -> repos -> workflow runs
// - org: Fetch all repos under org -> workflow runs
// - repo: Fetch workflow runs for specific repo
//
// Endpoint: /api/v1/repos/{owner}/{repo}/actions/runs?status=queued
// Filter returned runs by labels
return 0, nil
switch scope {
case v1alpha1.RunnerGroupScopeRepo:
return c.getQueuedRunsForRepo(ctx, giteaURL, authToken, org, repo, labels)
case v1alpha1.RunnerGroupScopeOrg:
return c.getQueuedRunsForOrg(ctx, giteaURL, authToken, org, labels)
case v1alpha1.RunnerGroupScopeGlobal:
return c.getQueuedRunsGlobal(ctx, giteaURL, authToken, labels)
default:
return 0, fmt.Errorf("unknown scope: %s", scope)
}
}
// getQueuedRunsForRepo fetches queued runs for a specific repository
func (c *HTTPClient) getQueuedRunsForRepo(ctx context.Context, giteaURL, authToken, owner, repo string, labels []string) (int, error) {
// Use jobs endpoint since it contains the runner labels we need for filtering
endpoint := fmt.Sprintf("%s/api/v1/repos/%s/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), owner, repo)
return c.fetchWorkflowJobs(ctx, endpoint, authToken, labels)
}
// getQueuedRunsForOrg fetches queued runs for all repos under an organization
func (c *HTTPClient) getQueuedRunsForOrg(ctx context.Context, giteaURL, authToken, org string, labels []string) (int, error) {
// Use direct org-level jobs endpoint for better performance
endpoint := fmt.Sprintf("%s/api/v1/orgs/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), org)
return c.fetchWorkflowJobs(ctx, endpoint, authToken, labels)
}
// getQueuedRunsGlobal fetches queued runs using admin-level API for global scope
func (c *HTTPClient) getQueuedRunsGlobal(ctx context.Context, giteaURL, authToken string, labels []string) (int, error) {
// Use admin-level jobs endpoint which provides global view of all queued jobs
endpoint := fmt.Sprintf("%s/api/v1/admin/actions/jobs", strings.TrimSuffix(giteaURL, "/"))
return c.fetchWorkflowJobs(ctx, endpoint, authToken, labels)
}
// fetchWorkflowJobs fetches workflow jobs from a given endpoint with label filtering and pagination
func (c *HTTPClient) fetchWorkflowJobs(ctx context.Context, endpoint, authToken string, labels []string) (int, error) {
totalCount := 0
page := 1
limit := 50 // Default page size
for {
u, err := url.Parse(endpoint)
if err != nil {
return 0, err
}
q := u.Query()
q.Set("status", "queued")
q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return 0, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return 0, c.handleHTTPError(resp.StatusCode, body, "fetch workflow jobs")
}
var result ActionWorkflowJobsResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
resp.Body.Close()
return 0, err
}
resp.Body.Close()
// Filter and count matching jobs for this page
pageCount := c.filterQueuedJobs(result.Jobs, labels)
totalCount += pageCount
// Break if we've fetched all available results
if len(result.Jobs) < limit {
break
}
page++
}
return totalCount, nil
}
// fetchWorkflowRuns fetches workflow runs from a given endpoint (deprecated - use jobs for label filtering)
func (c *HTTPClient) fetchWorkflowRuns(ctx context.Context, endpoint, authToken string) ([]ActionWorkflowRun, error) {
// Add status=queued query parameter
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("status", "queued")
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch workflow runs")
}
var result ActionWorkflowRunsResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.WorkflowRuns, nil
}
// fetchOrgRepos fetches all repositories under an organization with pagination
func (c *HTTPClient) fetchOrgRepos(ctx context.Context, giteaURL, authToken, org string) ([]Repository, error) {
var allRepos []Repository
page := 1
limit := 50
for {
endpoint := fmt.Sprintf("%s/api/v1/orgs/%s/repos", strings.TrimSuffix(giteaURL, "/"), org)
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch user repos")
}
var repos []Repository
if err := json.NewDecoder(resp.Body).Decode(&repos); err != nil {
resp.Body.Close()
return nil, err
}
resp.Body.Close()
allRepos = append(allRepos, repos...)
if len(repos) < limit {
break
}
page++
}
return allRepos, nil
}
// fetchAllOrgs fetches all organizations visible to the authenticated user with pagination
func (c *HTTPClient) fetchAllOrgs(ctx context.Context, giteaURL, authToken string) ([]Organization, error) {
var allOrgs []Organization
page := 1
limit := 50
for {
endpoint := fmt.Sprintf("%s/api/v1/user/orgs", strings.TrimSuffix(giteaURL, "/"))
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch org repos")
}
var orgs []Organization
if err := json.NewDecoder(resp.Body).Decode(&orgs); err != nil {
resp.Body.Close()
return nil, err
}
resp.Body.Close()
allOrgs = append(allOrgs, orgs...)
if len(orgs) < limit {
break
}
page++
}
return allOrgs, nil
}
// fetchUserRepos fetches all repositories owned by the authenticated user with pagination
func (c *HTTPClient) fetchUserRepos(ctx context.Context, giteaURL, authToken string) ([]Repository, error) {
var allRepos []Repository
page := 1
limit := 50
for {
endpoint := fmt.Sprintf("%s/api/v1/user/repos", strings.TrimSuffix(giteaURL, "/"))
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch user orgs")
}
var repos []Repository
if err := json.NewDecoder(resp.Body).Decode(&repos); err != nil {
resp.Body.Close()
return nil, err
}
resp.Body.Close()
allRepos = append(allRepos, repos...)
if len(repos) < limit {
break
}
page++
}
return allRepos, nil
}
// filterQueuedJobs filters workflow jobs by labels
func (c *HTTPClient) filterQueuedJobs(jobs []ActionWorkflowJob, requiredLabels []string) int {
if len(requiredLabels) == 0 {
// No label filtering required, return all queued jobs
return len(jobs)
}
count := 0
for _, job := range jobs {
if c.jobMatchesLabels(job.Labels, requiredLabels) {
count++
}
}
return count
}
// jobMatchesLabels checks if a job's labels match the required labels
func (c *HTTPClient) jobMatchesLabels(jobLabels, requiredLabels []string) bool {
// Convert job labels to map for faster lookup
labelSet := make(map[string]bool)
for _, label := range jobLabels {
labelSet[label] = true
}
// Check if all required labels are present
for _, required := range requiredLabels {
if !labelSet[required] {
return false
}
}
return true
}
// filterQueuedRuns filters workflow runs by labels (deprecated - use filterQueuedJobs)
func (c *HTTPClient) filterQueuedRuns(runs []ActionWorkflowRun, labels []string) int {
// Legacy method - jobs should be used for label filtering
return len(runs)
}
// handleHTTPError provides specific error handling for different HTTP status codes
func (c *HTTPClient) handleHTTPError(statusCode int, body []byte, operation string) error {
switch statusCode {
case http.StatusUnauthorized:
return fmt.Errorf("authentication failed for %s: check your token", operation)
case http.StatusForbidden:
return fmt.Errorf("access denied for %s: insufficient permissions", operation)
case http.StatusNotFound:
return fmt.Errorf("resource not found for %s: check URL and resource exists", operation)
case http.StatusTooManyRequests:
return fmt.Errorf("rate limit exceeded for %s: please retry later", operation)
case http.StatusInternalServerError:
return fmt.Errorf("internal server error for %s: %s", operation, string(body))
default:
return fmt.Errorf("gitea API returned status %d for %s: %s", statusCode, operation, string(body))
}
}

View File

@@ -0,0 +1,313 @@
/*
Copyright 2026.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gitea
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/bapung/gitea-runner-operator/api/v1alpha1"
)
func TestHTTPClient_GetQueuedRuns(t *testing.T) {
tests := []struct {
name string
scope v1alpha1.RunnerGroupScope
org string
repo string
labels []string
mockResponse ActionWorkflowJobsResponse
expectedCount int
expectedError bool
}{
{
name: "repo scope with matching labels",
scope: v1alpha1.RunnerGroupScopeRepo,
org: "testorg",
repo: "testrepo",
labels: []string{"linux", "x64"},
mockResponse: ActionWorkflowJobsResponse{
TotalCount: 2,
Jobs: []ActionWorkflowJob{
{ID: 1, Status: "queued", Labels: []string{"linux", "x64"}},
{ID: 2, Status: "queued", Labels: []string{"linux", "arm64"}},
},
},
expectedCount: 1,
expectedError: false,
},
{
name: "org scope no label filtering",
scope: v1alpha1.RunnerGroupScopeOrg,
org: "testorg",
labels: []string{},
mockResponse: ActionWorkflowJobsResponse{
TotalCount: 3,
Jobs: []ActionWorkflowJob{
{ID: 1, Status: "queued", Labels: []string{"linux", "x64"}},
{ID: 2, Status: "queued", Labels: []string{"windows"}},
{ID: 3, Status: "queued", Labels: []string{"macos"}},
},
},
expectedCount: 3,
expectedError: false,
},
{
name: "global scope with specific labels",
scope: v1alpha1.RunnerGroupScopeGlobal,
labels: []string{"docker"},
mockResponse: ActionWorkflowJobsResponse{
TotalCount: 2,
Jobs: []ActionWorkflowJob{
{ID: 1, Status: "queued", Labels: []string{"docker", "linux"}},
{ID: 2, Status: "queued", Labels: []string{"linux"}},
},
},
expectedCount: 1,
expectedError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Verify correct endpoint is called
expectedPath := ""
switch tt.scope {
case v1alpha1.RunnerGroupScopeRepo:
expectedPath = "/api/v1/repos/testorg/testrepo/actions/jobs"
case v1alpha1.RunnerGroupScopeOrg:
expectedPath = "/api/v1/orgs/testorg/actions/jobs"
case v1alpha1.RunnerGroupScopeGlobal:
expectedPath = "/api/v1/admin/actions/jobs"
}
if !strings.HasPrefix(r.URL.Path, expectedPath) {
t.Errorf("Expected path to start with %s, got %s", expectedPath, r.URL.Path)
}
// Verify query parameters
if r.URL.Query().Get("status") != "queued" {
t.Errorf("Expected status=queued, got %s", r.URL.Query().Get("status"))
}
// Verify authorization header
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "token ") {
t.Errorf("Expected Authorization header to start with 'token ', got %s", authHeader)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(tt.mockResponse)
}))
defer server.Close()
client := NewHTTPClient()
count, err := client.GetQueuedRuns(
context.Background(),
server.URL,
"test-token",
tt.scope,
tt.org,
tt.repo,
tt.labels,
)
if tt.expectedError && err == nil {
t.Error("Expected error but got none")
}
if !tt.expectedError && err != nil {
t.Errorf("Expected no error but got: %v", err)
}
if count != tt.expectedCount {
t.Errorf("Expected count %d, got %d", tt.expectedCount, count)
}
})
}
}
func TestJobMatchesLabels(t *testing.T) {
client := &HTTPClient{}
tests := []struct {
name string
jobLabels []string
requiredLabels []string
expected bool
}{
{
name: "exact match",
jobLabels: []string{"linux", "x64"},
requiredLabels: []string{"linux", "x64"},
expected: true,
},
{
name: "subset match",
jobLabels: []string{"linux", "x64", "docker"},
requiredLabels: []string{"linux", "x64"},
expected: true,
},
{
name: "no match",
jobLabels: []string{"linux", "arm64"},
requiredLabels: []string{"linux", "x64"},
expected: false,
},
{
name: "empty required labels",
jobLabels: []string{"linux", "x64"},
requiredLabels: []string{},
expected: true,
},
{
name: "partial match",
jobLabels: []string{"linux"},
requiredLabels: []string{"linux", "x64"},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := client.jobMatchesLabels(tt.jobLabels, tt.requiredLabels)
if result != tt.expected {
t.Errorf("Expected %v, got %v", tt.expected, result)
}
})
}
}
func TestFilterQueuedJobs(t *testing.T) {
client := &HTTPClient{}
jobs := []ActionWorkflowJob{
{ID: 1, Labels: []string{"linux", "x64"}},
{ID: 2, Labels: []string{"linux", "arm64"}},
{ID: 3, Labels: []string{"windows", "x64"}},
{ID: 4, Labels: []string{"linux", "x64", "docker"}},
}
tests := []struct {
name string
requiredLabels []string
expectedCount int
}{
{
name: "filter by linux",
requiredLabels: []string{"linux"},
expectedCount: 3,
},
{
name: "filter by linux and x64",
requiredLabels: []string{"linux", "x64"},
expectedCount: 2,
},
{
name: "filter by docker",
requiredLabels: []string{"docker"},
expectedCount: 1,
},
{
name: "no labels - return all",
requiredLabels: []string{},
expectedCount: 4,
},
{
name: "no matches",
requiredLabels: []string{"macos"},
expectedCount: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
count := client.filterQueuedJobs(jobs, tt.requiredLabels)
if count != tt.expectedCount {
t.Errorf("Expected %d, got %d", tt.expectedCount, count)
}
})
}
}
func TestHandleHTTPError(t *testing.T) {
client := &HTTPClient{}
tests := []struct {
name string
statusCode int
body []byte
operation string
expectedErr string
}{
{
name: "unauthorized",
statusCode: 401,
body: []byte("Unauthorized"),
operation: "test operation",
expectedErr: "authentication failed for test operation: check your token",
},
{
name: "forbidden",
statusCode: 403,
body: []byte("Forbidden"),
operation: "test operation",
expectedErr: "access denied for test operation: insufficient permissions",
},
{
name: "not found",
statusCode: 404,
body: []byte("Not Found"),
operation: "test operation",
expectedErr: "resource not found for test operation: check URL and resource exists",
},
{
name: "rate limit",
statusCode: 429,
body: []byte("Too Many Requests"),
operation: "test operation",
expectedErr: "rate limit exceeded for test operation: please retry later",
},
{
name: "server error",
statusCode: 500,
body: []byte("Internal Server Error"),
operation: "test operation",
expectedErr: "internal server error for test operation: Internal Server Error",
},
{
name: "other error",
statusCode: 400,
body: []byte("Bad Request"),
operation: "test operation",
expectedErr: "gitea API returned status 400 for test operation: Bad Request",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := client.handleHTTPError(tt.statusCode, tt.body, tt.operation)
if err.Error() != tt.expectedErr {
t.Errorf("Expected error %q, got %q", tt.expectedErr, err.Error())
}
})
}
}