1 Commits
dev ... v0.1.0

Author SHA1 Message Date
86e92c5e72 feat: implement working reconciliation logic and documentation
initial commit for working reconciliation logic, no automated test only manually tested for now
2026-01-12 22:57:22 +08:00
18 changed files with 810 additions and 655 deletions

View File

@@ -2,9 +2,10 @@ name: Build and Push Docker Image
on: on:
push: push:
branches: [ "main", "master" ] branches: ["main", "master"]
pull_request: pull_request:
branches: [ "main", "master" ] branches: ["main", "master"]
workflow_dispatch:
env: env:
REGISTRY: ghcr.io REGISTRY: ghcr.io

View File

@@ -20,4 +20,4 @@ jobs:
- name: Run linter - name: Run linter
uses: golangci/golangci-lint-action@v8 uses: golangci/golangci-lint-action@v8
with: with:
version: v2.1.0 version: v2.7.2

View File

@@ -1,32 +0,0 @@
name: E2E Tests
on:
push:
pull_request:
jobs:
test-e2e:
name: Run on Ubuntu
runs-on: ubuntu-latest
steps:
- name: Clone the code
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
- name: Install the latest version of kind
run: |
curl -Lo ./kind https://kind.sigs.k8s.io/dl/latest/kind-linux-amd64
chmod +x ./kind
sudo mv ./kind /usr/local/bin/kind
- name: Verify kind installation
run: kind version
- name: Running Test e2e
run: |
go mod tidy
make test-e2e

View File

@@ -20,4 +20,4 @@ jobs:
- name: Running Tests - name: Running Tests
run: | run: |
go mod tidy go mod tidy
make test make test ENVTEST_K8S_VERSION=1.31

View File

@@ -242,7 +242,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.18.0
ENVTEST_VERSION ?= $(shell go list -m -f "{{ .Version }}" sigs.k8s.io/controller-runtime | awk -F'[v.]' '{printf "release-%d.%d", $$2, $$3}') ENVTEST_VERSION ?= $(shell go list -m -f "{{ .Version }}" sigs.k8s.io/controller-runtime | awk -F'[v.]' '{printf "release-%d.%d", $$2, $$3}')
#ENVTEST_K8S_VERSION is the version of Kubernetes to use for setting up ENVTEST binaries (i.e. 1.31) #ENVTEST_K8S_VERSION is the version of Kubernetes to use for setting up ENVTEST binaries (i.e. 1.31)
ENVTEST_K8S_VERSION ?= $(shell go list -m -f "{{ .Version }}" k8s.io/api | awk -F'[v.]' '{printf "1.%d", $$3}') ENVTEST_K8S_VERSION ?= $(shell go list -m -f "{{ .Version }}" k8s.io/api | awk -F'[v.]' '{printf "1.%d", $$3}')
GOLANGCI_LINT_VERSION ?= v2.1.0 GOLANGCI_LINT_VERSION ?= v2.7.2
.PHONY: kustomize .PHONY: kustomize
kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary. kustomize: $(KUSTOMIZE) ## Download kustomize locally if necessary.

225
README.md
View File

@@ -1,82 +1,187 @@
# Overview # Gitea Runner Operator
Operator to manage gitea Act runner on Kubernetes A Kubernetes Operator to manage ephemeral Gitea Act runners. This operator automatically spawns runner pods based on queued jobs, support global, org/user, repo level runner. Definetely-vibe-coded (don't worry i know what i am doing).
# How it works? ## Features
1. It installs a set of CRDs: `kind: RunnerGroup` in Kubernetes - **Ephemeral Runners**: Each job gets a fresh runner which is destroyed after execution.
- **Multiple Scopes**: Support for `global`, `org`, `user`, and `repo` level runners.
- **Auto-Scaling**: Automatically scales runners up to a configured maximum based on queued jobs.
- **Label Matching**: matches Gitea job labels (e.g., `ubuntu-latest`) to runner capabilities.
## Prerequisites
- **Kubernetes Cluster**: v1.23+
- **Gitea**: v1.25.0+ (with Actions enabled)
## Installation (Helm Chart)
### Incoming
## Installation (Manual)
### 1. Deploy the Operator
You can deploy the operator using the provided manifests.
```bash
# Clone the repository
git clone https://github.com/bapung/gitea-runner-operator.git
cd gitea-runner-operator
# Install CRDs
make install
# Deploy the controller to the cluster
make deploy IMG=ghcr.io/bapung/gitea-runner-operator:latest
```
### 2. Create Credentials Secret
Create a secret containing the Gitea Registration Token and an API Auth Token.
1. **Registration Token**: Get this from Gitea Admin -> Actions -> Runners -> Create new Runner (or Org/Repo settings).
2. **Auth Token**: Generate a token in Gitea User Settings -> Applications. It needs `read:repository`, `read:user` permissions.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: gitea-runner-secret
namespace: gitea-runner-operator-system
type: Opaque
stringData:
registrationToken: "<YOUR_REGISTRATION_TOKEN>"
authToken: "<YOUR_API_TOKEN>"
```
Apply it:
```bash
kubectl apply -f secret.yaml
```
## Configuration
The core resource is the `RunnerGroup`. Below are examples for different scopes.
### 1. Repository Scope
Spawns runners only for jobs in a specific repository.
```yaml ```yaml
apiVersion: gitea.bpg.pw/v1alpha1 apiVersion: gitea.bpg.pw/v1alpha1
kind: RunnerGroup kind: RunnerGroup
metadata: metadata:
name: my-repo-runner-1 name: my-repo-runner
namespace: gitea-runner-system namespace: gitea-runner-operator-system
spec: spec:
scope: repo scope: repo
org: myorg # optional; ommited if scope == global org: myorg
repo: myreponame # optional; ommited if scope == org || scope == global repo: myrepo
gitea: giteaURL: https://gitea.example.com
url: https://gitea.bpg.pw maxActiveRunners: 5
labels: labels:
- default - "ubuntu-latest"
- app:infra - "custom-label"
maxActiveRunners: 5 # registrationToken:
registrationToken: # registration token for runner
secretRef: secretRef:
name: gitea-runner-secret-0 name: gitea-runner-secret
key: registrationToken key: registrationToken
authToken: # token to get list of job status authToken:
secretRef: secretRef:
name: gitea-runner-secret-0 name: gitea-runner-secret
key: authToken key: authToken
``` ```
2. The RunnerGroup controller will continuously watch for queued jobs based on its scope: `global`, `org`, or `repo`. If a new workflow run is detected with `status: queued`, based on the RunnerGroup's labels, the controller will spawn a new ephemeral runner as a Job. ### 2. Organization Scope
Spawns runners for any repository within the organization.
```yaml ```yaml
apiVersion: batch/v1 apiVersion: gitea.bpg.pw/v1alpha1
kind: Job kind: RunnerGroup
metadata: metadata:
name: my-repo-runner-1-275f1b8f name: my-org-runner
labels: namespace: gitea-runner-operator-system
app: my-repo-runner-1
# tags to determine that this resource is managed by the Operator
spec: spec:
# Optional: Automatically clean up the job after it finishes (e.g., 100 seconds) scope: org
ttlSecondsAfterFinished: 600 org: myorg
template: # repo is omitted
metadata: giteaURL: https://gitea.example.com
labels: maxActiveRunners: 10
app: act-my-repo-runner-1 # ... (tokens)
spec:
restartPolicy: OnFailure
securityContext:
fsGroup: 1000
volumes:
- name: runner-data
persistentVolumeClaim:
claimName: act-runner-vol
containers:
- name: runner
image: gitea/act_runner:nightly-dind-rootless
imagePullPolicy: Always
env:
- name: DOCKER_HOST
value: tcp://localhost:2376
- name: DOCKER_CERT_PATH
value: /certs/client
- name: DOCKER_TLS_VERIFY
value: "1"
- name: GITEA_INSTANCE_URL
value: https://gitea.bpg.pw
- name: GITEA_RUNNER_EPHEMERAL # always ephemeral
value: "1"
- name: GITEA_RUNNER_REGISTRATION_TOKEN
valueFrom:
secretKeyRef:
name: gitea-runner-secret-0
key: registrationToken
securityContext:
privileged: true
``` ```
### 3. User Scope
Spawns runners for any repository owned by the specified user.
```yaml
apiVersion: gitea.bpg.pw/v1alpha1
kind: RunnerGroup
metadata:
name: my-user-runner
namespace: gitea-runner-operator-system
spec:
scope: user
user: myusername
# org and repo are omitted
giteaURL: https://gitea.example.com
maxActiveRunners: 3
# ... (tokens)
```
### 4. Global Scope
Spawns runners for any job in the Gitea instance (Admin level).
```yaml
apiVersion: gitea.bpg.pw/v1alpha1
kind: RunnerGroup
metadata:
name: global-runner
namespace: gitea-runner-operator-system
spec:
scope: global
# org, user, and repo are omitted
giteaURL: https://gitea.example.com
maxActiveRunners: 20
# ... (tokens)
```
## How it works
1. The **Controller** polls the Gitea API (using the `authToken`) to check for queued jobs matching the scope and labels.
2. If a matching queued job is found, and the current active runner count is below `maxActiveRunners`, the Controller creates a Kubernetes `Job`.
3. The `Job` pod starts an `act_runner` instance, registers itself using the `registrationToken` (as ephemeral), picks up the job, executes it, and then terminates.
## Troubleshooting
### Runners are not starting
1. **Check Controller Logs**:
```bash
kubectl logs -n gitea-runner-operator-system -l control-plane=controller-manager -f
```
Look for errors regarding API authentication or connectivity.
2. **Check Permissions**:
Ensure the `authToken` has sufficient permissions (`read:repository`, etc.) to query actions.
3. **Check Labels**:
Enable debug logging in the controller to see label matching logic. If your Gitea job requires `ubuntu-latest` but your RunnerGroup defines `centos`, it won't match.
### Docker Daemon Issues
This is a default rootless Job template from Gitea doc, it has issues with docker daemon. I still can't to get it working with `docker` command, other container works just fine if you put correct labels.
Per Gemini:
The default runner image uses `dind-rootless`. This requires the pod to run with `privileged: true`. Ensure your cluster policies (PSP/PSA) allow privileged pods in the operator namespace.
## Roadmap / Wishlist
- Helm Chart
- Custom Runner Job Spec definition
- Push mode using Webhook trigger

View File

@@ -32,14 +32,16 @@ const (
RunnerGroupScopeGlobal RunnerGroupScope = "global" RunnerGroupScopeGlobal RunnerGroupScope = "global"
// RunnerGroupScopeOrg means the runner group is scoped to an organization // RunnerGroupScopeOrg means the runner group is scoped to an organization
RunnerGroupScopeOrg RunnerGroupScope = "org" RunnerGroupScopeOrg RunnerGroupScope = "org"
// RunnerGroupScopeUser means the runner group is scoped to a user
RunnerGroupScopeUser RunnerGroupScope = "user"
// RunnerGroupScopeRepo means the runner group is scoped to a repository // RunnerGroupScopeRepo means the runner group is scoped to a repository
RunnerGroupScopeRepo RunnerGroupScope = "repo" RunnerGroupScopeRepo RunnerGroupScope = "repo"
) )
// RunnerGroupSpec defines the desired state of RunnerGroup. // RunnerGroupSpec defines the desired state of RunnerGroup.
type RunnerGroupSpec struct { type RunnerGroupSpec struct {
// Scope defines the scope of the runner (global, org, repo) // Scope defines the scope of the runner (global, org, user, repo)
// +kubebuilder:validation:Enum=global;org;repo // +kubebuilder:validation:Enum=global;org;user;repo
// +kubebuilder:validation:Required // +kubebuilder:validation:Required
Scope RunnerGroupScope `json:"scope"` Scope RunnerGroupScope `json:"scope"`
@@ -47,6 +49,10 @@ type RunnerGroupSpec struct {
// +optional // +optional
Org string `json:"org,omitempty"` Org string `json:"org,omitempty"`
// User is required if scope is 'user'
// +optional
User string `json:"user,omitempty"`
// Repo is required if scope is 'repo' // Repo is required if scope is 'repo'
// +optional // +optional
Repo string `json:"repo,omitempty"` Repo string `json:"repo,omitempty"`

View File

@@ -107,12 +107,17 @@ spec:
description: Repo is required if scope is 'repo' description: Repo is required if scope is 'repo'
type: string type: string
scope: scope:
description: Scope defines the scope of the runner (global, org, repo) description: Scope defines the scope of the runner (global, org, user,
repo)
enum: enum:
- global - global
- org - org
- user
- repo - repo
type: string type: string
user:
description: User is required if scope is 'user'
type: string
required: required:
- authToken - authToken
- giteaURL - giteaURL

View File

@@ -0,0 +1,10 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: system
spec:
template:
spec:
imagePullSecrets:
- name: ghcr-secret

View File

@@ -1,2 +1,11 @@
resources: resources:
- manager.yaml - manager.yaml
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: ghcr.io/bapung/gitea-runner-operator
newTag: sha-b33c78b
patchesStrategicMerge:
- image_pull_secret_patch.yaml

View File

@@ -10,6 +10,6 @@ roleRef:
kind: ClusterRole kind: ClusterRole
name: manager-role name: manager-role
subjects: subjects:
- kind: ServiceAccount - kind: ServiceAccount
name: controller-manager name: controller-manager
namespace: system namespace: gitea-runner-operator-system

View File

@@ -1,3 +1,16 @@
apiVersion: v1
kind: Secret
metadata:
name: gitea-credentials
labels:
app.kubernetes.io/name: gitea-runner-operator
app.kubernetes.io/managed-by: kustomize
stringData:
# The Gitea API Token (for the Operator to poll for jobs)
auth-token: "MMUCFRXCbofYn2L0aT2OP2aug7JhChNJlULKNLgg"
# The Runner Registration Token (for the Runner to register itself)
registration-token: "5r4lpLA9rKCZZEHyUyKHeA187DoaElcTBySITRRi"
---
apiVersion: gitea.bpg.pw/v1alpha1 apiVersion: gitea.bpg.pw/v1alpha1
kind: RunnerGroup kind: RunnerGroup
metadata: metadata:
@@ -6,4 +19,29 @@ metadata:
app.kubernetes.io/managed-by: kustomize app.kubernetes.io/managed-by: kustomize
name: runnergroup-sample name: runnergroup-sample
spec: spec:
# TODO(user): Add fields here # The base URL of your Gitea instance
giteaURL: "https://gitea.bpg.pw"
# Scope of the runners (global, org, or repo)
scope: "org"
#org: "bapungorg" # Required if scope is 'org' or 'repo'; cannot be used with user
user: "bapung" # Required if scope is 'user' or 'repo'; cannot be used with org
#repo: "dummy-service-workflow" # Required if scope is 'repo'
# Labels to identify this runner group
labels:
- "linux"
- "amd64"
# Maximum number of runners to spawn concurrently
maxActiveRunners: 5
# Reference to the Secret containing the API token
authToken:
name: gitea-credentials
key: auth-token
# Reference to the Secret containing the Registration token
registrationToken:
name: gitea-credentials
key: registration-token

View File

@@ -30,18 +30,23 @@ type RunnerGroupScope string
const ( const (
RunnerGroupScopeGlobal RunnerGroupScope = "global" RunnerGroupScopeGlobal RunnerGroupScope = "global"
RunnerGroupScopeOrg RunnerGroupScope = "org" RunnerGroupScopeOrg RunnerGroupScope = "org"
RunnerGroupScopeUser RunnerGroupScope = "user"
RunnerGroupScopeRepo RunnerGroupScope = "repo" RunnerGroupScopeRepo RunnerGroupScope = "repo"
) )
type RunnerGroupSpec struct { type RunnerGroupSpec struct {
// Scope defines the scope of the runner (global, org, repo) // Scope defines the scope of the runner (global, org, user, repo)
// +kubebuilder:validation:Enum=global;org;repo // +kubebuilder:validation:Enum=global;org;user;repo
Scope RunnerScope `json:"scope"` Scope RunnerScope `json:"scope"`
// Org is required if scope is 'org' // Org is required if scope is 'org'
// +optional // +optional
Org string `json:"org,omitempty"` Org string `json:"org,omitempty"`
// User is required if scope is 'user'
// +optional
User string `json:"user,omitempty"`
// Repo is required if scope is 'repo' // Repo is required if scope is 'repo'
// +optional // +optional
Repo string `json:"repo,omitempty"` Repo string `json:"repo,omitempty"`
@@ -49,7 +54,8 @@ type RunnerGroupSpec struct {
// GiteaURL is the base URL of the Gitea instance // GiteaURL is the base URL of the Gitea instance
GiteaURL string `json:"giteaURL"` GiteaURL string `json:"giteaURL"`
// Labels to assign to the runner // Labels to assign to the runner.
// Defaults (e.g. ubuntu-latest) are merged automatically by the controller.
// +optional // +optional
Labels []string `json:"labels,omitempty"` Labels []string `json:"labels,omitempty"`
@@ -79,154 +85,103 @@ type RunnerGroupStatus struct {
## 4. Controller Implementation (`internal/controller/runnergroup_controller.go`) ## 4. Controller Implementation (`internal/controller/runnergroup_controller.go`)
The controller handles the reconciliation loop. The controller handles the reconciliation loop and manages the lifecycle of ephemeral runners.
### 4.1 RBAC Permissions ### 4.1 Struct Definition
Add markers to generate RBAC roles: The reconciler includes a thread-safe map to cache spawned jobs and prevent duplicate scheduling.
```go ```go
// +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups,verbs=get;list;watch;create;update;patch;delete type RunnerGroupReconciler struct {
// +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups/status,verbs=get;update;patch client.Client
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete Scheme *runtime.Scheme
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch GiteaClient gitea.Client
SpawnedJobsCache sync.Map // Stores [int64]time.Time (JobID -> SpawnTime)
}
``` ```
### 4.2 Reconcile Logic ### 4.2 Reconcile Logic
The `Reconcile` function should follow this flow: The `Reconcile` function follows this flow:
1. **Fetch RunnerGroup**: Get the `RunnerGroup` CR instance. If not found, ignore (deleted). 1. **Fetch RunnerGroup**: Get the `RunnerGroup` CR instance.
2. **List Jobs**: List all `batchv1.Job` resources in the same namespace that are owned by this RunnerGroup. 2. **List Jobs**: List all `batchv1.Job` resources owned by this CR to calculate `activeRunners`.
- Filter by label `gitea.bpg.pw/runnergroup-name=<runnergroup-name>`. 3. **Update Status**: Update `status.activeRunners`.
3. **Update Status**: Update `status.activeRunners` with the count of non-completed jobs. 4. **Capacity Check**: Stop scaling if `activeRunners >= spec.maxActiveRunners`.
4. **Capacity Check**: 5. **Label Calculation**: Call `getEffectiveLabels` to merge `spec.labels` with hardcoded Gitea defaults (e.g., `ubuntu-latest:docker://node:16-bullseye`).
- If `activeRunners >= spec.maxActiveRunners`, stop and requeue. 6. **Poll Gitea**:
5. **Poll Gitea**: - Retrieve Auth Token.
- Retrieve the Auth Token from the Secret referenced in `spec.authToken`. - Call `GiteaClient.GetRunnerStats` with the effective labels.
- Instantiate a Gitea API Client. - This returns a list of `QueuedJobs`.
- Query for queued workflow runs matching the scope and labels. 7. **Scale Up & Deduplication**:
6. **Scale Up**: - Iterate through `stats.QueuedJobs`.
- Calculate `needed = count(queued_jobs)`. - **Check Cache**: If Job ID exists in `SpawnedJobsCache`:
- Calculate `available_slots = spec.maxActiveRunners - activeRunners`. - If TTL (< 5 min) is valid: **Skip** (already handled).
- `to_spawn = min(needed, available_slots)`. - If TTL expired: **Retry** (assume previous runner failed).
- Loop `to_spawn` times: - If Job ID not in cache or expired:
- Create a new `batchv1.Job`. - Check `availableSlots`.
7. **Requeue**: Return `ctrl.Result{RequeueAfter: 10 * time.Second}` to ensure continuous polling. - Retrieve Registration Token (if not yet fetched).
- **Spawn Job**: Create `batchv1.Job`.
- **Update Cache**: Store Job ID in `SpawnedJobsCache`.
- Decrement `availableSlots`.
8. **Cache Cleanup**: Remove IDs from `SpawnedJobsCache` if they are not present in the latest `QueuedJobs` list from Gitea.
9. **Requeue**: Return `ctrl.Result{RequeueAfter: 10 * time.Second}`.
### 4.3 Job Construction ### 4.3 Helper Functions
Helper function to create the Job object: #### getEffectiveLabels
```go Merges user-defined labels with Gitea defaults. If a user defines `ubuntu-latest`, it overrides the default `ubuntu-latest:docker://...`.
func (r *RunnerGroupReconciler) constructJobForRunnerGroup(runnerGroup *giteav1alpha1.RunnerGroup, registrationToken string) (*batchv1.Job, error) {
// Generate random suffix for name
name := fmt.Sprintf("%s-%s", runnerGroup.Name, randString(5))
// Construct Env Vars #### constructJobForRunnerGroup
envVars := []corev1.EnvVar{
{Name: "GITEA_INSTANCE_URL", Value: runnerGroup.Spec.GiteaURL},
{Name: "GITEA_RUNNER_REGISTRATION_TOKEN", Value: registrationToken},
{Name: "GITEA_RUNNER_EPHEMERAL", Value: "true"},
{Name: "DOCKER_HOST", Value: "tcp://localhost:2376"},
// ... other envs from README
}
if len(runnerGroup.Spec.Labels) > 0 { Creates the Job object with:
labelsStr := strings.Join(runnerGroup.Spec.Labels, ",")
envVars = append(envVars, corev1.EnvVar{Name: "GITEA_RUNNER_LABELS", Value: labelsStr})
}
// Construct Job - **Name**: `{runnergroup-name}-{random-suffix}`
job := &batchv1.Job{ - **Env**:
ObjectMeta: metav1.ObjectMeta{ - `GITEA_RUNNER_NAME`: Set to the Job name.
Name: name, - `GITEA_RUNNER_LABELS`: Comma-separated effective labels.
Namespace: runnerGroup.Namespace, - Standard runner envs (`GITEA_INSTANCE_URL`, etc).
Labels: map[string]string{
"app": runnerGroup.Name,
"gitea.bpg.pw/runnergroup-name": runnerGroup.Name,
"gitea.bpg.pw/managed-by": "gitea-runner-operator",
},
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: pointer.Int32(600),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{
{
Name: "runner",
Image: "gitea/act_runner:nightly-dind-rootless",
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{Privileged: pointer.Bool(true)},
Env: envVars,
VolumeMounts: []corev1.VolumeMount{
{Name: "runner-data", MountPath: "/data"},
},
},
},
Volumes: []corev1.Volume{
{
Name: "runner-data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "act-runner-vol", // Note: Consider making this configurable or EmptyDir
},
},
},
},
},
},
},
}
// Set Controller Reference
if err := ctrl.SetControllerReference(runnerGroup, job, r.Scheme); err != nil {
return nil, err
}
return job, nil
}
```
## 5. Gitea Client (`internal/gitea/client.go`) ## 5. Gitea Client (`internal/gitea/client.go`)
A simple HTTP client wrapper to interact with Gitea. A specialized client to interact with Gitea's Actions API.
### 5.1 Interface ### 5.1 Interface
```go ```go
type RunnerStats struct {
QueuedJobs []ActionWorkflowJob
Running int
}
type Client interface { type Client interface {
GetQueuedRuns(ctx context.Context, scope RunnerGroupScope, owner, repo string, labels []string) (int, error) GetRunnerStats(ctx context.Context, giteaURL, authToken string, scope RunnerGroupScope, org, repo string, labels []string) (*RunnerStats, error)
} }
``` ```
### 5.2 Implementation Details ### 5.2 Logic
- **Endpoint**: `/api/v1/repos/{owner}/{repo}/actions/runs` 1. **Endpoints**:
- **Query Params**: `status=queued` - Repo/Org/Global: Uses `/actions/jobs` endpoints.
- **Filtering**: - User: Fetches repos via `/users/{user}/repos`, then queries `/actions/jobs` for each repo.
- The API might return all queued runs. 2. **Fetching**:
- The client must filter these runs locally to ensure they match the `labels` defined in the RunnerGroup CR. - Fetches jobs with `status=queued`, `waiting`, `pending`.
- _Note_: Gitea API might not support filtering by labels directly in the list endpoint, so client-side filtering is necessary. - Handles pagination (fetches all pages).
3. **Filtering**:
- Iterates through fetched jobs.
- **Matches Labels**: Checks if the job's required labels are a subset of the runner's supported labels (effective labels).
- Supports exact match (`linux` == `linux`)
- Supports schema match (`ubuntu-latest` matches `ubuntu-latest:docker://...`)
- Returns only matching jobs in `QueuedJobs`.
## 6. Configuration & Deployment ## 6. Testing Strategy
### 6.1 Dockerfile 1. **Unit Tests (`internal/gitea/client_test.go`)**:
- Mock Gitea API server.
Standard Operator SDK Dockerfile. Ensure the base image is minimal (e.g., `gcr.io/distroless/static:nonroot`). - Verify `GetRunnerStats` correctly parses JSON and handles pagination.
- Verify label matching logic (subset, schema matching).
### 6.2 Kustomize 2. **Controller Tests**:
- Verify `SpawnedJobsCache` prevents double scheduling.
Update `config/default/kustomization.yaml` to include the CRD and RBAC configurations. - Verify TTL logic allows retries for stuck jobs.
- Verify `getEffectiveLabels` merging logic.
## 7. Testing Strategy
1. **Unit Tests**:
- Test `constructJobForRunnerGroup` to ensure Env vars and Labels are set correctly.
- Test Gitea Client response parsing.
2. **Integration Tests (EnvTest)**:
- Spin up a local k8s control plane.
- Create a `RunnerGroup` CR.
- Verify the controller creates a `Job` when the mocked Gitea client returns queued jobs.
- Verify the controller respects `MaxActiveRunners`.

View File

@@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"strings" "strings"
"sync"
"time" "time"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
@@ -42,6 +43,7 @@ type RunnerGroupReconciler struct {
client.Client client.Client
Scheme *runtime.Scheme Scheme *runtime.Scheme
GiteaClient gitea.Client GiteaClient gitea.Client
SpawnedJobsCache sync.Map
} }
// +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups,verbs=get;list;watch;create;update;patch;delete
@@ -117,43 +119,66 @@ func (r *RunnerGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request)
logger.Info("Checking Gitea for queued jobs", "url", runnerGroup.Spec.GiteaURL, "scope", runnerGroup.Spec.Scope) logger.Info("Checking Gitea for queued jobs", "url", runnerGroup.Spec.GiteaURL, "scope", runnerGroup.Spec.Scope)
// Calculate effective labels (spec labels + defaults)
effectiveLabels := r.getEffectiveLabels(runnerGroup.Spec.Labels)
// Query for queued workflow runs // Query for queued workflow runs
queuedJobs, err := r.GiteaClient.GetQueuedRuns( stats, err := r.GiteaClient.GetRunnerStats(
ctx, ctx,
runnerGroup.Spec.GiteaURL, runnerGroup.Spec.GiteaURL,
authToken, authToken,
runnerGroup.Spec.Scope, runnerGroup.Spec.Scope,
runnerGroup.Spec.Org, runnerGroup.Spec.Org,
runnerGroup.Spec.User,
runnerGroup.Spec.Repo, runnerGroup.Spec.Repo,
runnerGroup.Spec.Labels, effectiveLabels,
) )
if err != nil { if err != nil {
logger.Error(err, "Failed to query Gitea for queued runs") logger.Error(err, "Failed to query Gitea for runner stats")
return ctrl.Result{RequeueAfter: 10 * time.Second}, err return ctrl.Result{RequeueAfter: 10 * time.Second}, err
} }
logger.Info("Gitea query result", "queuedJobs", queuedJobs) logger.Info("Gitea query result", "queuedJobs", len(stats.QueuedJobs))
// 6. Scale Up // 6. Scale Up and Cache Management
availableSlots := runnerGroup.Spec.MaxActiveRunners - activeRunners availableSlots := runnerGroup.Spec.MaxActiveRunners - activeRunners
toSpawn := min(queuedJobs, availableSlots)
if toSpawn > 0 { // Track current queued IDs for cache cleanup
logger.Info("Spawning runners", currentQueuedIDs := make(map[int64]bool)
"queuedJobs", queuedJobs,
"availableSlots", availableSlots,
"toSpawn", toSpawn)
// Retrieve Registration Token from Secret // Retrieve Registration Token from Secret (only if we need to spawn)
registrationToken, err := r.getSecretValue(ctx, runnerGroup.Namespace, runnerGroup.Spec.RegistrationTokenRef) var registrationToken string
tokenFetched := false
for _, giteaJob := range stats.QueuedJobs {
currentQueuedIDs[giteaJob.ID] = true
if availableSlots <= 0 {
continue
}
// Check if we already spawned a runner for this job
if value, loaded := r.SpawnedJobsCache.Load(giteaJob.ID); loaded {
spawnTime := value.(time.Time)
if time.Since(spawnTime) < 5*time.Minute {
// Already handling this job recently
continue
}
// TTL expired (runner likely failed to start), retry spawning
logger.Info("Job stuck in queue for too long, retrying runner spawn", "giteaJobID", giteaJob.ID)
}
// Need to spawn a runner
if !tokenFetched {
registrationToken, err = r.getSecretValue(ctx, runnerGroup.Namespace, runnerGroup.Spec.RegistrationTokenRef)
if err != nil { if err != nil {
logger.Error(err, "Failed to get registration token from secret") logger.Error(err, "Failed to get registration token from secret")
return ctrl.Result{}, err return ctrl.Result{}, err
} }
tokenFetched = true
}
// Spawn jobs job, err := r.constructJobForRunnerGroup(runnerGroup, registrationToken, effectiveLabels)
for i := 0; i < toSpawn; i++ {
job, err := r.constructJobForRunnerGroup(runnerGroup, registrationToken)
if err != nil { if err != nil {
logger.Error(err, "Failed to construct Job") logger.Error(err, "Failed to construct Job")
return ctrl.Result{}, err return ctrl.Result{}, err
@@ -163,9 +188,23 @@ func (r *RunnerGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request)
logger.Error(err, "Failed to create Job", "jobName", job.Name) logger.Error(err, "Failed to create Job", "jobName", job.Name)
return ctrl.Result{}, err return ctrl.Result{}, err
} }
logger.Info("Created Job", "jobName", job.Name)
logger.Info("Created Job for Gitea Run", "jobName", job.Name, "giteaJobID", giteaJob.ID)
// Mark as spawned
r.SpawnedJobsCache.Store(giteaJob.ID, time.Now())
availableSlots--
} }
// Cleanup cache: remove jobs that are no longer queued in Gitea
r.SpawnedJobsCache.Range(func(key, value any) bool {
jobID := key.(int64)
if !currentQueuedIDs[jobID] {
// Job is no longer in the queue (running, completed, or cancelled)
r.SpawnedJobsCache.Delete(key)
} }
return true
})
// 7. Requeue for continuous polling // 7. Requeue for continuous polling
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
@@ -191,8 +230,43 @@ func (r *RunnerGroupReconciler) getSecretValue(ctx context.Context, namespace st
return string(value), nil return string(value), nil
} }
// getEffectiveLabels merges spec labels with default labels
func (r *RunnerGroupReconciler) getEffectiveLabels(specLabels []string) []string {
defaultLabels := []string{
"ubuntu-latest:docker://node:16-bullseye",
"ubuntu-22.04:docker://node:16-bullseye",
"ubuntu-20.04:docker://node:16-bullseye",
"ubuntu-18.04:docker://node:16-buster",
}
effectiveLabels := make([]string, len(specLabels))
copy(effectiveLabels, specLabels)
for _, defaultLabel := range defaultLabels {
// Check if this default label key is already overridden in specLabels
// defaultLabel format is "key:schema"
parts := strings.SplitN(defaultLabel, ":", 2)
key := parts[0]
found := false
for _, specLabel := range specLabels {
// Spec label can be "key" or "key:schema"
if specLabel == key || strings.HasPrefix(specLabel, key+":") {
found = true
break
}
}
if !found {
effectiveLabels = append(effectiveLabels, defaultLabel)
}
}
return effectiveLabels
}
// constructJobForRunnerGroup creates a Job object for the RunnerGroup // constructJobForRunnerGroup creates a Job object for the RunnerGroup
func (r *RunnerGroupReconciler) constructJobForRunnerGroup(runnerGroup *giteav1alpha1.RunnerGroup, registrationToken string) (*batchv1.Job, error) { func (r *RunnerGroupReconciler) constructJobForRunnerGroup(runnerGroup *giteav1alpha1.RunnerGroup, registrationToken string, labels []string) (*batchv1.Job, error) {
// Generate random suffix for name // Generate random suffix for name
name := fmt.Sprintf("%s-%s", runnerGroup.Name, randString(8)) name := fmt.Sprintf("%s-%s", runnerGroup.Name, randString(8))
@@ -201,13 +275,14 @@ func (r *RunnerGroupReconciler) constructJobForRunnerGroup(runnerGroup *giteav1a
{Name: "GITEA_INSTANCE_URL", Value: runnerGroup.Spec.GiteaURL}, {Name: "GITEA_INSTANCE_URL", Value: runnerGroup.Spec.GiteaURL},
{Name: "GITEA_RUNNER_REGISTRATION_TOKEN", Value: registrationToken}, {Name: "GITEA_RUNNER_REGISTRATION_TOKEN", Value: registrationToken},
{Name: "GITEA_RUNNER_EPHEMERAL", Value: "true"}, {Name: "GITEA_RUNNER_EPHEMERAL", Value: "true"},
{Name: "GITEA_RUNNER_NAME", Value: name},
{Name: "DOCKER_HOST", Value: "tcp://localhost:2376"}, {Name: "DOCKER_HOST", Value: "tcp://localhost:2376"},
{Name: "DOCKER_CERT_PATH", Value: "/certs/client"}, {Name: "DOCKER_CERT_PATH", Value: "/certs/client"},
{Name: "DOCKER_TLS_VERIFY", Value: "1"}, {Name: "DOCKER_TLS_VERIFY", Value: "1"},
} }
if len(runnerGroup.Spec.Labels) > 0 { if len(labels) > 0 {
labelsStr := strings.Join(runnerGroup.Spec.Labels, ",") labelsStr := strings.Join(labels, ",")
envVars = append(envVars, corev1.EnvVar{Name: "GITEA_RUNNER_LABELS", Value: labelsStr}) envVars = append(envVars, corev1.EnvVar{Name: "GITEA_RUNNER_LABELS", Value: labelsStr})
} }
@@ -276,14 +351,6 @@ func randString(length int) string {
return string(b) return string(b)
} }
// min returns the minimum of two integers
func min(a, b int) int {
if a < b {
return a
}
return b
}
// SetupWithManager sets up the controller with the Manager. // SetupWithManager sets up the controller with the Manager.
func (r *RunnerGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { func (r *RunnerGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr). return ctrl.NewControllerManagedBy(mgr).

View File

@@ -25,11 +25,19 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/reconcile"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
giteav1alpha1 "github.com/bapung/gitea-runner-operator/api/v1alpha1" giteav1alpha1 "github.com/bapung/gitea-runner-operator/api/v1alpha1"
"github.com/bapung/gitea-runner-operator/internal/gitea"
) )
type fakeGiteaClient struct{}
func (c *fakeGiteaClient) GetRunnerStats(ctx context.Context, giteaURL, authToken string, scope giteav1alpha1.RunnerGroupScope, org string, user string, repo string, labels []string) (*gitea.RunnerStats, error) {
return &gitea.RunnerStats{QueuedJobs: []gitea.ActionWorkflowJob{}}, nil
}
var _ = Describe("RunnerGroup Controller", func() { var _ = Describe("RunnerGroup Controller", func() {
Context("When reconciling a resource", func() { Context("When reconciling a resource", func() {
const resourceName = "test-resource" const resourceName = "test-resource"
@@ -43,6 +51,21 @@ var _ = Describe("RunnerGroup Controller", func() {
runnergroup := &giteav1alpha1.RunnerGroup{} runnergroup := &giteav1alpha1.RunnerGroup{}
BeforeEach(func() { BeforeEach(func() {
By("creating the secret")
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "gitea-secret",
Namespace: "default",
},
Data: map[string][]byte{
"token": []byte("dummy"),
"auth": []byte("dummy"),
},
}
if err := k8sClient.Create(ctx, secret); err != nil && !errors.IsAlreadyExists(err) {
Expect(err).To(Succeed())
}
By("creating the custom resource for the Kind RunnerGroup") By("creating the custom resource for the Kind RunnerGroup")
err := k8sClient.Get(ctx, typeNamespacedName, runnergroup) err := k8sClient.Get(ctx, typeNamespacedName, runnergroup)
if err != nil && errors.IsNotFound(err) { if err != nil && errors.IsNotFound(err) {
@@ -51,7 +74,19 @@ var _ = Describe("RunnerGroup Controller", func() {
Name: resourceName, Name: resourceName,
Namespace: "default", Namespace: "default",
}, },
// TODO(user): Specify other spec details if needed. Spec: giteav1alpha1.RunnerGroupSpec{
Scope: giteav1alpha1.RunnerGroupScopeGlobal,
GiteaURL: "https://gitea.example.com",
MaxActiveRunners: 1,
RegistrationTokenRef: corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: "gitea-secret"},
Key: "token",
},
AuthTokenRef: corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: "gitea-secret"},
Key: "auth",
},
},
} }
Expect(k8sClient.Create(ctx, resource)).To(Succeed()) Expect(k8sClient.Create(ctx, resource)).To(Succeed())
} }
@@ -71,6 +106,7 @@ var _ = Describe("RunnerGroup Controller", func() {
controllerReconciler := &RunnerGroupReconciler{ controllerReconciler := &RunnerGroupReconciler{
Client: k8sClient, Client: k8sClient,
Scheme: k8sClient.Scheme(), Scheme: k8sClient.Scheme(),
GiteaClient: &fakeGiteaClient{},
} }
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{

View File

@@ -31,17 +31,22 @@ import (
// Client defines the interface for interacting with Gitea API // Client defines the interface for interacting with Gitea API
type Client interface { type Client interface {
// GetQueuedRuns queries Gitea for queued workflow runs matching the scope and labels // GetRunnerStats queries Gitea for queued workflow runs matching the scope and labels
// Returns the count of queued jobs that match the criteria GetRunnerStats(
GetQueuedRuns(
ctx context.Context, ctx context.Context,
giteaURL string, giteaURL string,
authToken string, authToken string,
scope v1alpha1.RunnerGroupScope, scope v1alpha1.RunnerGroupScope,
org string, org string,
user string,
repo string, repo string,
labels []string, labels []string,
) (int, error) ) (*RunnerStats, error)
}
// RunnerStats contains lists of jobs in different states
type RunnerStats struct {
QueuedJobs []ActionWorkflowJob
} }
// HTTPClient is the default implementation of the Gitea Client interface // HTTPClient is the default implementation of the Gitea Client interface
@@ -107,69 +112,106 @@ type ActionWorkflowJob struct {
RunnerName string `json:"runner_name"` RunnerName string `json:"runner_name"`
} }
// GetQueuedRuns implements the Client interface // GetRunnerStats implements the Client interface
func (c *HTTPClient) GetQueuedRuns( func (c *HTTPClient) GetRunnerStats(
ctx context.Context, ctx context.Context,
giteaURL string, giteaURL string,
authToken string, authToken string,
scope v1alpha1.RunnerGroupScope, scope v1alpha1.RunnerGroupScope,
org string, org string,
user string,
repo string, repo string,
labels []string, labels []string,
) (int, error) { ) (*RunnerStats, error) {
switch scope { switch scope {
case v1alpha1.RunnerGroupScopeRepo: case v1alpha1.RunnerGroupScopeRepo:
return c.getQueuedRunsForRepo(ctx, giteaURL, authToken, org, repo, labels) return c.getRunnerStatsForRepo(ctx, giteaURL, authToken, org, repo, labels)
case v1alpha1.RunnerGroupScopeOrg: case v1alpha1.RunnerGroupScopeOrg:
return c.getQueuedRunsForOrg(ctx, giteaURL, authToken, org, labels) return c.getRunnerStatsForOrg(ctx, giteaURL, authToken, org, labels)
case v1alpha1.RunnerGroupScopeUser:
return c.getRunnerStatsForUser(ctx, giteaURL, authToken, user, labels)
case v1alpha1.RunnerGroupScopeGlobal: case v1alpha1.RunnerGroupScopeGlobal:
return c.getQueuedRunsGlobal(ctx, giteaURL, authToken, labels) return c.getRunnerStatsGlobal(ctx, giteaURL, authToken, labels)
default: default:
return 0, fmt.Errorf("unknown scope: %s", scope) return nil, fmt.Errorf("unknown scope: %s", scope)
} }
} }
// getQueuedRunsForRepo fetches queued runs for a specific repository // getRunnerStatsForRepo fetches queued runs for a specific repository
func (c *HTTPClient) getQueuedRunsForRepo(ctx context.Context, giteaURL, authToken, owner, repo string, labels []string) (int, error) { func (c *HTTPClient) getRunnerStatsForRepo(ctx context.Context, giteaURL, authToken, owner, repo string, labels []string) (*RunnerStats, error) {
// Use jobs endpoint since it contains the runner labels we need for filtering
endpoint := fmt.Sprintf("%s/api/v1/repos/%s/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), owner, repo) endpoint := fmt.Sprintf("%s/api/v1/repos/%s/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), owner, repo)
return c.fetchWorkflowJobs(ctx, endpoint, authToken, labels) return c.fetchRunnerStats(ctx, endpoint, authToken, labels)
} }
// getQueuedRunsForOrg fetches queued runs for all repos under an organization // getRunnerStatsForOrg fetches queued runs for all repos under an organization
func (c *HTTPClient) getQueuedRunsForOrg(ctx context.Context, giteaURL, authToken, org string, labels []string) (int, error) { func (c *HTTPClient) getRunnerStatsForOrg(ctx context.Context, giteaURL, authToken, org string, labels []string) (*RunnerStats, error) {
// Use direct org-level jobs endpoint for better performance
endpoint := fmt.Sprintf("%s/api/v1/orgs/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), org) endpoint := fmt.Sprintf("%s/api/v1/orgs/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), org)
return c.fetchWorkflowJobs(ctx, endpoint, authToken, labels) return c.fetchRunnerStats(ctx, endpoint, authToken, labels)
} }
// getQueuedRunsGlobal fetches queued runs using admin-level API for global scope // getRunnerStatsForUser fetches queued runs for all repos owned by a user
func (c *HTTPClient) getQueuedRunsGlobal(ctx context.Context, giteaURL, authToken string, labels []string) (int, error) { func (c *HTTPClient) getRunnerStatsForUser(ctx context.Context, giteaURL, authToken, user string, labels []string) (*RunnerStats, error) {
// Use admin-level jobs endpoint which provides global view of all queued jobs repos, err := c.fetchReposForUser(ctx, giteaURL, authToken, user)
if err != nil {
return nil, err
}
var allQueuedJobs []ActionWorkflowJob
for _, repo := range repos {
endpoint := fmt.Sprintf("%s/api/v1/repos/%s/%s/actions/jobs", strings.TrimSuffix(giteaURL, "/"), repo.Owner.Login, repo.Name)
stats, err := c.fetchRunnerStats(ctx, endpoint, authToken, labels)
if err != nil {
return nil, err
}
allQueuedJobs = append(allQueuedJobs, stats.QueuedJobs...)
}
return &RunnerStats{
QueuedJobs: allQueuedJobs,
}, nil
}
// getRunnerStatsGlobal fetches queued runs using admin-level API for global scope
func (c *HTTPClient) getRunnerStatsGlobal(ctx context.Context, giteaURL, authToken string, labels []string) (*RunnerStats, error) {
endpoint := fmt.Sprintf("%s/api/v1/admin/actions/jobs", strings.TrimSuffix(giteaURL, "/")) endpoint := fmt.Sprintf("%s/api/v1/admin/actions/jobs", strings.TrimSuffix(giteaURL, "/"))
return c.fetchWorkflowJobs(ctx, endpoint, authToken, labels) return c.fetchRunnerStats(ctx, endpoint, authToken, labels)
}
func (c *HTTPClient) fetchRunnerStats(ctx context.Context, endpoint, authToken string, labels []string) (*RunnerStats, error) {
queuedJobs, err := c.fetchWorkflowJobs(ctx, endpoint, authToken, labels, []string{"queued", "waiting", "pending"})
if err != nil {
return nil, err
}
return &RunnerStats{
QueuedJobs: queuedJobs,
}, nil
} }
// fetchWorkflowJobs fetches workflow jobs from a given endpoint with label filtering and pagination // fetchWorkflowJobs fetches workflow jobs from a given endpoint with label filtering and pagination
func (c *HTTPClient) fetchWorkflowJobs(ctx context.Context, endpoint, authToken string, labels []string) (int, error) { func (c *HTTPClient) fetchWorkflowJobs(ctx context.Context, endpoint, authToken string, labels []string, statuses []string) ([]ActionWorkflowJob, error) {
totalCount := 0 var allJobs []ActionWorkflowJob
for _, status := range statuses {
page := 1 page := 1
limit := 50 // Default page size limit := 50 // Default page size
for { for {
u, err := url.Parse(endpoint) u, err := url.Parse(endpoint)
if err != nil { if err != nil {
return 0, err return nil, err
} }
q := u.Query() q := u.Query()
q.Set("status", "queued") q.Set("status", status)
q.Set("page", fmt.Sprintf("%d", page)) q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit)) q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
fmt.Printf("DEBUG: Fetching jobs from %s\n", u.String())
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil { if err != nil {
return 0, err return nil, err
} }
req.Header.Set("Authorization", "token "+authToken) req.Header.Set("Authorization", "token "+authToken)
@@ -177,25 +219,35 @@ func (c *HTTPClient) fetchWorkflowJobs(ctx context.Context, endpoint, authToken
resp, err := c.httpClient.Do(req) resp, err := c.httpClient.Do(req)
if err != nil { if err != nil {
return 0, err fmt.Printf("DEBUG: Request failed: %v\n", err)
return nil, err
} }
fmt.Printf("DEBUG: Response status: %s\n", resp.Status)
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
resp.Body.Close() _ = resp.Body.Close()
return 0, c.handleHTTPError(resp.StatusCode, body, "fetch workflow jobs") fmt.Printf("DEBUG: Error body: %s\n", string(body))
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch workflow jobs")
} }
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
fmt.Printf("DEBUG: Response body: %s\n", string(body))
var result ActionWorkflowJobsResponse var result ActionWorkflowJobsResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { if err := json.Unmarshal(body, &result); err != nil {
resp.Body.Close() fmt.Printf("DEBUG: Failed to decode response: %v\n", err)
return 0, err return nil, err
} }
resp.Body.Close()
// Filter and count matching jobs for this page fmt.Printf("DEBUG: Found %d jobs, total in Gitea: %d\n", len(result.Jobs), result.TotalCount)
pageCount := c.filterQueuedJobs(result.Jobs, labels)
totalCount += pageCount // Filter and collect matching jobs for this page
matchedJobs := c.filterQueuedJobs(result.Jobs, labels)
fmt.Printf("DEBUG: %d jobs matched labels %v\n", len(matchedJobs), labels)
allJobs = append(allJobs, matchedJobs...)
// Break if we've fetched all available results // Break if we've fetched all available results
if len(result.Jobs) < limit { if len(result.Jobs) < limit {
@@ -204,56 +256,19 @@ func (c *HTTPClient) fetchWorkflowJobs(ctx context.Context, endpoint, authToken
page++ page++
} }
}
return totalCount, nil return allJobs, nil
} }
// fetchWorkflowRuns fetches workflow runs from a given endpoint (deprecated - use jobs for label filtering) // fetchReposForUser fetches all repositories owned by a specific user with pagination
func (c *HTTPClient) fetchWorkflowRuns(ctx context.Context, endpoint, authToken string) ([]ActionWorkflowRun, error) { func (c *HTTPClient) fetchReposForUser(ctx context.Context, giteaURL, authToken, username string) ([]Repository, error) {
// Add status=queued query parameter
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("status", "queued")
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch workflow runs")
}
var result ActionWorkflowRunsResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.WorkflowRuns, nil
}
// fetchOrgRepos fetches all repositories under an organization with pagination
func (c *HTTPClient) fetchOrgRepos(ctx context.Context, giteaURL, authToken, org string) ([]Repository, error) {
var allRepos []Repository var allRepos []Repository
page := 1 page := 1
limit := 50 limit := 50
for { for {
endpoint := fmt.Sprintf("%s/api/v1/orgs/%s/repos", strings.TrimSuffix(giteaURL, "/"), org) endpoint := fmt.Sprintf("%s/api/v1/users/%s/repos", strings.TrimSuffix(giteaURL, "/"), username)
u, err := url.Parse(endpoint) u, err := url.Parse(endpoint)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -263,6 +278,8 @@ func (c *HTTPClient) fetchOrgRepos(ctx context.Context, giteaURL, authToken, org
q.Set("limit", fmt.Sprintf("%d", limit)) q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
fmt.Printf("DEBUG: Fetching repos for user %s from %s\n", username, u.String())
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -273,131 +290,28 @@ func (c *HTTPClient) fetchOrgRepos(ctx context.Context, giteaURL, authToken, org
resp, err := c.httpClient.Do(req) resp, err := c.httpClient.Do(req)
if err != nil { if err != nil {
fmt.Printf("DEBUG: Request failed: %v\n", err)
return nil, err return nil, err
} }
fmt.Printf("DEBUG: Response status: %s\n", resp.Status)
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
resp.Body.Close() _ = resp.Body.Close()
fmt.Printf("DEBUG: Error body: %s\n", string(body))
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch user repos") return nil, c.handleHTTPError(resp.StatusCode, body, "fetch user repos")
} }
var repos []Repository
if err := json.NewDecoder(resp.Body).Decode(&repos); err != nil {
resp.Body.Close()
return nil, err
}
resp.Body.Close()
allRepos = append(allRepos, repos...)
if len(repos) < limit {
break
}
page++
}
return allRepos, nil
}
// fetchAllOrgs fetches all organizations visible to the authenticated user with pagination
func (c *HTTPClient) fetchAllOrgs(ctx context.Context, giteaURL, authToken string) ([]Organization, error) {
var allOrgs []Organization
page := 1
limit := 50
for {
endpoint := fmt.Sprintf("%s/api/v1/user/orgs", strings.TrimSuffix(giteaURL, "/"))
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
resp.Body.Close() _ = resp.Body.Close()
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch org repos") // fmt.Printf("DEBUG: Response body: %s\n", string(body))
}
var orgs []Organization
if err := json.NewDecoder(resp.Body).Decode(&orgs); err != nil {
resp.Body.Close()
return nil, err
}
resp.Body.Close()
allOrgs = append(allOrgs, orgs...)
if len(orgs) < limit {
break
}
page++
}
return allOrgs, nil
}
// fetchUserRepos fetches all repositories owned by the authenticated user with pagination
func (c *HTTPClient) fetchUserRepos(ctx context.Context, giteaURL, authToken string) ([]Repository, error) {
var allRepos []Repository
page := 1
limit := 50
for {
endpoint := fmt.Sprintf("%s/api/v1/user/repos", strings.TrimSuffix(giteaURL, "/"))
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
q := u.Query()
q.Set("page", fmt.Sprintf("%d", page))
q.Set("limit", fmt.Sprintf("%d", limit))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "token "+authToken)
req.Header.Set("Accept", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, c.handleHTTPError(resp.StatusCode, body, "fetch user orgs")
}
var repos []Repository var repos []Repository
if err := json.NewDecoder(resp.Body).Decode(&repos); err != nil { if err := json.Unmarshal(body, &repos); err != nil {
resp.Body.Close() fmt.Printf("DEBUG: Failed to decode response: %v\n", err)
return nil, err return nil, err
} }
resp.Body.Close()
allRepos = append(allRepos, repos...) allRepos = append(allRepos, repos...)
@@ -412,44 +326,42 @@ func (c *HTTPClient) fetchUserRepos(ctx context.Context, giteaURL, authToken str
} }
// filterQueuedJobs filters workflow jobs by labels // filterQueuedJobs filters workflow jobs by labels
func (c *HTTPClient) filterQueuedJobs(jobs []ActionWorkflowJob, requiredLabels []string) int { func (c *HTTPClient) filterQueuedJobs(jobs []ActionWorkflowJob, runnerLabels []string) []ActionWorkflowJob {
if len(requiredLabels) == 0 { var matched []ActionWorkflowJob
// No label filtering required, return all queued jobs
return len(jobs)
}
count := 0
for _, job := range jobs { for _, job := range jobs {
if c.jobMatchesLabels(job.Labels, requiredLabels) { match := c.jobMatchesLabels(job.Labels, runnerLabels)
count++ fmt.Printf("DEBUG: Job %d (Status: %s, Labels: %v) matches runner capabilities %v? %v\n", job.ID, job.Status, job.Labels, runnerLabels, match)
if match {
matched = append(matched, job)
} }
} }
return count return matched
} }
// jobMatchesLabels checks if a job's labels match the required labels // jobMatchesLabels checks if a job's requirements are satisfied by the runner's supported labels
func (c *HTTPClient) jobMatchesLabels(jobLabels, requiredLabels []string) bool { func (c *HTTPClient) jobMatchesLabels(jobLabels, supportedLabels []string) bool {
// Convert job labels to map for faster lookup if len(jobLabels) == 0 {
labelSet := make(map[string]bool) return true
for _, label := range jobLabels {
labelSet[label] = true
} }
// Check if all required labels are present // For each label required by the job, check if the runner supports it
for _, required := range requiredLabels { for _, req := range jobLabels {
if !labelSet[required] { found := false
for _, supp := range supportedLabels {
// Check for exact match or schema match (label:schema)
// e.g. Job asks for "ubuntu-latest", Runner has "ubuntu-latest:docker://..."
if req == supp || strings.HasPrefix(supp, req+":") {
found = true
break
}
}
if !found {
return false return false
} }
} }
return true return true
} }
// filterQueuedRuns filters workflow runs by labels (deprecated - use filterQueuedJobs)
func (c *HTTPClient) filterQueuedRuns(runs []ActionWorkflowRun, labels []string) int {
// Legacy method - jobs should be used for label filtering
return len(runs)
}
// handleHTTPError provides specific error handling for different HTTP status codes // handleHTTPError provides specific error handling for different HTTP status codes
func (c *HTTPClient) handleHTTPError(statusCode int, body []byte, operation string) error { func (c *HTTPClient) handleHTTPError(statusCode int, body []byte, operation string) error {
switch statusCode { switch statusCode {

View File

@@ -27,15 +27,16 @@ import (
"github.com/bapung/gitea-runner-operator/api/v1alpha1" "github.com/bapung/gitea-runner-operator/api/v1alpha1"
) )
func TestHTTPClient_GetQueuedRuns(t *testing.T) { func TestHTTPClient_GetRunnerStats(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
scope v1alpha1.RunnerGroupScope scope v1alpha1.RunnerGroupScope
org string org string
user string
repo string repo string
labels []string labels []string
mockResponse ActionWorkflowJobsResponse mockResponse ActionWorkflowJobsResponse
expectedCount int expectedQueued int
expectedError bool expectedError bool
}{ }{
{ {
@@ -51,37 +52,54 @@ func TestHTTPClient_GetQueuedRuns(t *testing.T) {
{ID: 2, Status: "queued", Labels: []string{"linux", "arm64"}}, {ID: 2, Status: "queued", Labels: []string{"linux", "arm64"}},
}, },
}, },
expectedCount: 1, expectedQueued: 1, // Job 1 matches
expectedError: false, expectedError: false,
}, },
{ {
name: "org scope no label filtering", name: "org scope no label filtering (matches all)",
scope: v1alpha1.RunnerGroupScopeOrg, scope: v1alpha1.RunnerGroupScopeOrg,
org: "testorg", org: "testorg",
labels: []string{}, labels: []string{}, // No specific capabilities, matches jobs with empty requirements? No, empty labels matches nothing?
// Wait, previous logic was: if reqLabels is empty, return all.
// New logic: if runnerLabels is empty (passed as 'labels' here), it matches jobs with NO requirements.
// But for test purposes, let's assume we pass runner capabilities.
// If we pass empty runner capabilities, we match nothing that has requirements.
// Let's pass capabilities that cover the jobs.
mockResponse: ActionWorkflowJobsResponse{ mockResponse: ActionWorkflowJobsResponse{
TotalCount: 3, TotalCount: 3,
Jobs: []ActionWorkflowJob{ Jobs: []ActionWorkflowJob{
{ID: 1, Status: "queued", Labels: []string{"linux", "x64"}}, {ID: 1, Status: "queued", Labels: []string{"linux"}},
{ID: 2, Status: "queued", Labels: []string{"windows"}},
{ID: 3, Status: "queued", Labels: []string{"macos"}},
}, },
}, },
expectedCount: 3, expectedQueued: 0, // No runner capabilities provided -> no match
expectedError: false, expectedError: false,
}, },
{ {
name: "global scope with specific labels", name: "global scope with specific labels",
scope: v1alpha1.RunnerGroupScopeGlobal, scope: v1alpha1.RunnerGroupScopeGlobal,
labels: []string{"docker"}, labels: []string{"docker", "linux"},
mockResponse: ActionWorkflowJobsResponse{ mockResponse: ActionWorkflowJobsResponse{
TotalCount: 2, TotalCount: 2,
Jobs: []ActionWorkflowJob{ Jobs: []ActionWorkflowJob{
{ID: 1, Status: "queued", Labels: []string{"docker", "linux"}}, {ID: 1, Status: "queued", Labels: []string{"docker", "linux"}}, // Match
{ID: 2, Status: "queued", Labels: []string{"linux"}}, {ID: 2, Status: "queued", Labels: []string{"linux"}}, // Match (subset)
}, },
}, },
expectedCount: 1, expectedQueued: 2,
expectedError: false,
},
{
name: "user scope",
scope: v1alpha1.RunnerGroupScopeUser,
user: "testuser",
labels: []string{"linux"},
mockResponse: ActionWorkflowJobsResponse{
TotalCount: 1,
Jobs: []ActionWorkflowJob{
{ID: 1, Status: "queued", Labels: []string{"linux"}},
},
},
expectedQueued: 1,
expectedError: false, expectedError: false,
}, },
} }
@@ -90,6 +108,23 @@ func TestHTTPClient_GetQueuedRuns(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
// Create mock server // Create mock server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Handle User Repos call for User Scope
if tt.scope == v1alpha1.RunnerGroupScopeUser && strings.Contains(r.URL.Path, "/repos") && !strings.Contains(r.URL.Path, "/actions/jobs") {
repos := []Repository{
{
Name: "testrepo",
Owner: struct {
Login string `json:"login"`
}{Login: tt.user},
FullName: tt.user + "/testrepo",
},
}
_ = json.NewEncoder(w).Encode(repos)
return
}
// Verify correct endpoint is called // Verify correct endpoint is called
expectedPath := "" expectedPath := ""
switch tt.scope { switch tt.scope {
@@ -99,35 +134,37 @@ func TestHTTPClient_GetQueuedRuns(t *testing.T) {
expectedPath = "/api/v1/orgs/testorg/actions/jobs" expectedPath = "/api/v1/orgs/testorg/actions/jobs"
case v1alpha1.RunnerGroupScopeGlobal: case v1alpha1.RunnerGroupScopeGlobal:
expectedPath = "/api/v1/admin/actions/jobs" expectedPath = "/api/v1/admin/actions/jobs"
case v1alpha1.RunnerGroupScopeUser:
expectedPath = "/api/v1/repos/" + tt.user + "/testrepo/actions/jobs"
} }
if !strings.HasPrefix(r.URL.Path, expectedPath) { if !strings.HasPrefix(r.URL.Path, expectedPath) {
t.Errorf("Expected path to start with %s, got %s", expectedPath, r.URL.Path) t.Errorf("Expected path to start with %s, got %s", expectedPath, r.URL.Path)
} }
// Verify query parameters
if r.URL.Query().Get("status") != "queued" {
t.Errorf("Expected status=queued, got %s", r.URL.Query().Get("status"))
}
// Verify authorization header // Verify authorization header
authHeader := r.Header.Get("Authorization") authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "token ") { if !strings.HasPrefix(authHeader, "token ") {
t.Errorf("Expected Authorization header to start with 'token ', got %s", authHeader) t.Errorf("Expected Authorization header to start with 'token ', got %s", authHeader)
} }
w.Header().Set("Content-Type", "application/json") // Only return jobs for 'queued' status to simplify counting
json.NewEncoder(w).Encode(tt.mockResponse) if r.URL.Query().Get("status") == "queued" {
_ = json.NewEncoder(w).Encode(tt.mockResponse)
} else {
_ = json.NewEncoder(w).Encode(ActionWorkflowJobsResponse{TotalCount: 0, Jobs: []ActionWorkflowJob{}})
}
})) }))
defer server.Close() defer server.Close()
client := NewHTTPClient() client := NewHTTPClient()
count, err := client.GetQueuedRuns( stats, err := client.GetRunnerStats(
context.Background(), context.Background(),
server.URL, server.URL,
"test-token", "test-token",
tt.scope, tt.scope,
tt.org, tt.org,
tt.user,
tt.repo, tt.repo,
tt.labels, tt.labels,
) )
@@ -138,8 +175,10 @@ func TestHTTPClient_GetQueuedRuns(t *testing.T) {
if !tt.expectedError && err != nil { if !tt.expectedError && err != nil {
t.Errorf("Expected no error but got: %v", err) t.Errorf("Expected no error but got: %v", err)
} }
if count != tt.expectedCount { if stats != nil {
t.Errorf("Expected count %d, got %d", tt.expectedCount, count) if len(stats.QueuedJobs) != tt.expectedQueued {
t.Errorf("Expected %d queued jobs, got %d", tt.expectedQueued, len(stats.QueuedJobs))
}
} }
}) })
} }
@@ -151,44 +190,44 @@ func TestJobMatchesLabels(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
jobLabels []string jobLabels []string
requiredLabels []string supportedLabels []string
expected bool expected bool
}{ }{
{ {
name: "exact match", name: "exact match",
jobLabels: []string{"linux", "x64"}, jobLabels: []string{"linux", "x64"},
requiredLabels: []string{"linux", "x64"}, supportedLabels: []string{"linux", "x64"},
expected: true, expected: true,
}, },
{ {
name: "subset match", name: "subset match (runner has more)",
jobLabels: []string{"linux", "x64", "docker"},
requiredLabels: []string{"linux", "x64"},
expected: true,
},
{
name: "no match",
jobLabels: []string{"linux", "arm64"},
requiredLabels: []string{"linux", "x64"},
expected: false,
},
{
name: "empty required labels",
jobLabels: []string{"linux", "x64"},
requiredLabels: []string{},
expected: true,
},
{
name: "partial match",
jobLabels: []string{"linux"}, jobLabels: []string{"linux"},
requiredLabels: []string{"linux", "x64"}, supportedLabels: []string{"linux", "x64"},
expected: true,
},
{
name: "schema match",
jobLabels: []string{"ubuntu-latest"},
supportedLabels: []string{"ubuntu-latest:docker://node:16"},
expected: true,
},
{
name: "no match (missing req)",
jobLabels: []string{"linux", "arm64"},
supportedLabels: []string{"linux", "x64"},
expected: false, expected: false,
}, },
{
name: "empty required labels (matches anything)",
jobLabels: []string{},
supportedLabels: []string{"linux"},
expected: true,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
result := client.jobMatchesLabels(tt.jobLabels, tt.requiredLabels) result := client.jobMatchesLabels(tt.jobLabels, tt.supportedLabels)
if result != tt.expected { if result != tt.expected {
t.Errorf("Expected %v, got %v", tt.expected, result) t.Errorf("Expected %v, got %v", tt.expected, result)
} }
@@ -208,41 +247,31 @@ func TestFilterQueuedJobs(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
requiredLabels []string supportedLabels []string
expectedCount int expectedIDs []int64
}{ }{
{ {
name: "filter by linux", name: "runner supports linux, x64",
requiredLabels: []string{"linux"}, supportedLabels: []string{"linux", "x64"},
expectedCount: 3, expectedIDs: []int64{1},
}, },
{ {
name: "filter by linux and x64", name: "runner supports linux, x64, docker",
requiredLabels: []string{"linux", "x64"}, supportedLabels: []string{"linux", "x64", "docker"},
expectedCount: 2, expectedIDs: []int64{1, 4},
}, },
{ {
name: "filter by docker", name: "runner supports everything",
requiredLabels: []string{"docker"}, supportedLabels: []string{"linux", "x64", "arm64", "windows", "docker"},
expectedCount: 1, expectedIDs: []int64{1, 2, 3, 4},
},
{
name: "no labels - return all",
requiredLabels: []string{},
expectedCount: 4,
},
{
name: "no matches",
requiredLabels: []string{"macos"},
expectedCount: 0,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
count := client.filterQueuedJobs(jobs, tt.requiredLabels) matched := client.filterQueuedJobs(jobs, tt.supportedLabels)
if count != tt.expectedCount { if len(matched) != len(tt.expectedIDs) {
t.Errorf("Expected %d, got %d", tt.expectedCount, count) t.Errorf("Expected %d matched jobs, got %d", len(tt.expectedIDs), len(matched))
} }
}) })
} }

View File

@@ -10,6 +10,8 @@ The Gitea Runner Operator is a Kubernetes controller designed to manage ephemera
- **RunnerGroup CR**: The custom resource instance defining a runner pool. - **RunnerGroup CR**: The custom resource instance defining a runner pool.
- **Ephemeral Runner**: A runner that executes exactly one job and then terminates. - **Ephemeral Runner**: A runner that executes exactly one job and then terminates.
- **Gitea Instance**: The target Gitea server where CI/CD workflows are triggered. - **Gitea Instance**: The target Gitea server where CI/CD workflows are triggered.
- **Runner Capabilities**: The set of labels a runner provides (e.g., `ubuntu-latest`).
- **Job Requirements**: The set of labels a job requests (e.g., `ubuntu-latest`).
## 3. Custom Resource Definition (CRD) ## 3. Custom Resource Definition (CRD)
@@ -25,12 +27,13 @@ The Gitea Runner Operator is a Kubernetes controller designed to manage ephemera
The `spec` defines the configuration for the runner pool. The `spec` defines the configuration for the runner pool.
| Field | Type | Required | Description | | Field | Type | Required | Description |
| :------------------ | :----------------------------- | :---------- | :---------------------------------------------------------------------------------------------------------- | | :------------------ | :------------------------------------- | :---------- | :---------------------------------------------------------------------------------------------------------- |
| `scope` | Enum (`global`, `org`, `repo`) | Yes | The scope of the runner. | | `scope` | Enum (`global`, `org`, `user`, `repo`) | Yes | The scope of the runner. |
| `org` | String | Conditional | The organization name. Required if `scope` is `org`. | | `org` | String | Conditional | The organization name. Required if `scope` is `org`. |
| `user` | String | Conditional | The username. Required if `scope` is `user`. |
| `repo` | String | Conditional | The repository name. Required if `scope` is `repo`. | | `repo` | String | Conditional | The repository name. Required if `scope` is `repo`. |
| `gitea.url` | String | Yes | The base URL of the Gitea instance (e.g., `https://gitea.example.com`). | | `gitea.url` | String | Yes | The base URL of the Gitea instance (e.g., `https://gitea.example.com`). |
| `labels` | []String | No | List of labels for the runner (e.g., `ubuntu-latest`, `app:infra`). Used by Gitea to match jobs to runners. | | `labels` | []String | No | List of labels for the runner (e.g., `app:infra`). Defaults (e.g. `ubuntu-latest`) are added automatically. |
| `maxActiveRunners` | Integer | Yes | The maximum number of concurrent runner Jobs allowed for this specific RunnerGroup CR. | | `maxActiveRunners` | Integer | Yes | The maximum number of concurrent runner Jobs allowed for this specific RunnerGroup CR. |
| `registrationToken` | SecretKeySelector | Yes | Reference to a Secret containing the runner registration token. | | `registrationToken` | SecretKeySelector | Yes | Reference to a Secret containing the runner registration token. |
| `authToken` | SecretKeySelector | Yes | Reference to a Secret containing an API token to query Gitea for job statuses. | | `authToken` | SecretKeySelector | Yes | Reference to a Secret containing an API token to query Gitea for job statuses. |
@@ -42,7 +45,7 @@ Standard Kubernetes Secret reference:
- `secretRef.name`: Name of the secret. - `secretRef.name`: Name of the secret.
- `secretRef.key`: Key within the secret containing the value. - `secretRef.key`: Key within the secret containing the value.
### 3.3 Status Schema (Optional but Recommended) ### 3.3 Status Schema
- `activeRunners`: Integer. Current count of running Jobs managed by this CR. - `activeRunners`: Integer. Current count of running Jobs managed by this CR.
- `lastCheckTime`: Timestamp. Last time the controller polled Gitea. - `lastCheckTime`: Timestamp. Last time the controller polled Gitea.
@@ -54,37 +57,44 @@ Standard Kubernetes Secret reference:
The controller watches for changes to `RunnerGroup` resources. The controller watches for changes to `RunnerGroup` resources.
1. **Validation**: Ensure `org` or `repo` are present based on `scope`. 1. **Validation**: Ensure `org` or `repo` are present based on `scope`.
2. **Job Cleanup**: (Optional) Check for and remove "stuck" jobs if TTL doesn't cover edge cases, though `ttlSecondsAfterFinished` is primary. 2. **Job List**: List child Jobs to determine `activeRunners` count.
3. **Metric Collection**: Update status with current running job count. 3. **Status Update**: Update CR status with current metrics.
4. **Polling**: The controller must implement a polling mechanism (loop) independent of the standard Reconcile trigger, or requeue the Reconcile event periodically (e.g., every 10-30 seconds). 4. **Capacity Check**: If `activeRunners >= maxActiveRunners`, stop scaling up.
5. **Polling**: Fetch job statistics from Gitea.
### 4.2 Polling & Scaling Logic ### 4.2 Polling & Scaling Strategy
On every poll interval for a specific `RunnerGroup` CR: The operator uses a robust polling strategy to handle the disconnect between Kubernetes Pod startup time and Gitea's job queue state.
1. **Check Capacity**: #### 4.2.1 Fetching Stats (`GetRunnerStats`)
- Query Kubernetes for active `Jobs` owned by this `RunnerGroup` CR.
- If `count(active_jobs) >= maxActiveRunners`, stop. Do not spawn new runners.
2. **Fetch Queued Jobs**: The controller queries Gitea for:
- Call Gitea API using `authToken`.
- Endpoint depends on scope:
- **Global**: Recursively fetch all workflow runs:
1. Fetch all organizations in the Gitea instance
2. For each organization, fetch all repositories under that org
3. For each repository, query `/repos/{owner}/{repo}/actions/runs?status=queued`
4. Additionally, fetch all user-owned repositories and query their workflow runs
- **Org**: Fetch all workflow runs in repos under the organization:
1. Fetch all repositories under the specified organization
2. For each repository, query `/repos/{owner}/{repo}/actions/runs?status=queued`
- **Repo**: Directly query `/repos/{owner}/{repo}/actions/runs?status=queued`
- Filter the returned runs:
- Must match the `labels` defined in the `RunnerGroup` CR.
3. **Spawn Runner**: 1. **Queued Jobs**: Jobs with status `queued`, `waiting`, or `pending`.
- If a queued job is found and capacity allows, create a Kubernetes `Job`. - **Label Filtering**: Jobs are filtered client-side. A job is considered a match if the RunnerGroup's capabilities (Spec labels + Default labels) are a superset of the Job's required labels.
- **One Job per Queued Workflow**: Ideally, the logic should map 1 queued run -> 1 Runner Job. 2. **Running Jobs**: Jobs with status `running` that belong to this specific runner group (filtered by runner name prefix).
- **Concurrency Control**: Ensure we don't spawn more jobs than `maxActiveRunners - currentActiveRunners`.
#### 4.2.2 Deduplication Cache (`SpawnedJobsCache`)
To prevent "double scheduling" (where multiple reconciliation loops spawn multiple runners for the same queued job before the first runner can pick it up), the controller maintains an in-memory cache:
- **Key**: Gitea Job ID.
- **Value**: Timestamp when the runner was spawned.
- **TTL**: 5 minutes.
#### 4.2.3 Scaling Algorithm
1. **Identify Candidates**: Iterate through the list of Queued Jobs from Gitea.
2. **Check Cache**:
- If Job ID is in cache and TTL has not expired: **Skip** (Runner already spawned).
- If Job ID is in cache and TTL expired: **Retry** (Runner likely failed to start).
- If Job ID is not in cache: **Candidate for spawning**.
3. **Calculate Slots**: `availableSlots = maxActiveRunners - activeRunners`.
4. **Spawn**: For each candidate, if `availableSlots > 0`:
- Create Kubernetes Job.
- Add Job ID to `SpawnedJobsCache`.
- Decrement `availableSlots`.
5. **Cleanup**: Remove Job IDs from the cache if they are no longer present in the Queued Jobs list returned by Gitea (implies they are now Running, Completed, or Cancelled).
## 5. Kubernetes Resource Generation ## 5. Kubernetes Resource Generation
@@ -94,40 +104,44 @@ The controller creates a `batch/v1 Job`.
**Metadata:** **Metadata:**
- `name`: `{runnergroup-cr-name}-{random-suffix}` - `name`: `{runnergroup-name}-{random-suffix}`
- `namespace`: Same as `RunnerGroup` CR. - `namespace`: Same as `RunnerGroup` CR.
- `labels`: - `labels`:
- `app`: `{runnergroup-cr-name}` - `gitea.bpg.pw/runnergroup-name`: `{runnergroup-name}`
- `gitea.bpg.pw/managed-by`: `gitea-runner-operator` - `gitea.bpg.pw/managed-by`: `gitea-runner-operator`
- `gitea.bpg.pw/runnergroup-name`: `{runnergroup-cr-name}`
- `ownerReferences`: Pointing to the `RunnerGroup` CR. - `ownerReferences`: Pointing to the `RunnerGroup` CR.
**Spec:** **Spec:**
- `ttlSecondsAfterFinished`: 600 (Clean up finished jobs). - `ttlSecondsAfterFinished`: 600 (Auto-cleanup).
- `template`: - `template`:
- `spec`: - `spec`:
- `restartPolicy`: `OnFailure` - `restartPolicy`: `OnFailure`
- `containers`: - `containers`:
- **Name**: `runner` - **Name**: `runner`
- **Image**: `gitea/act_runner:nightly-dind-rootless` (Default, potentially configurable in CR later). - **Image**: `gitea/act_runner:nightly-dind-rootless`
- **SecurityContext**: `privileged: true` (Required for DIND).
- **Env**: - **Env**:
- `GITEA_INSTANCE_URL`: From `spec.gitea.url`. - `GITEA_INSTANCE_URL`: From `spec.gitea.url`.
- `GITEA_RUNNER_REGISTRATION_TOKEN`: From `spec.registrationToken`. - `GITEA_RUNNER_REGISTRATION_TOKEN`: From Secret.
- `GITEA_RUNNER_EPHEMERAL`: `"true"`. - `GITEA_RUNNER_EPHEMERAL`: `"true"`.
- `GITEA_RUNNER_LABELS`: Comma-separated list from `spec.labels`. - `GITEA_RUNNER_NAME`: `{job-name}` (Matches Pod name for easier debugging).
- `DOCKER_HOST`: `tcp://localhost:2376` - `GITEA_RUNNER_LABELS`: Comma-separated list of **Effective Labels**.
- **VolumeMounts**: - **Effective Labels** = `spec.labels` + Default Gitea Labels (e.g., `ubuntu-latest:docker://node:16-bullseye`, `ubuntu-22.04:...`, etc.) unless explicitly overridden.
- Mount docker socket or storage if necessary. The README example uses a PVC `act-runner-vol` mounted to `/data`. _Note: Using a shared PVC for ephemeral runners might cause race conditions. EmptyDir is preferred for truly ephemeral runners unless caching is strictly required and managed._
## 6. Gitea API Interaction ## 6. Gitea API Interaction
- **Authentication**: Bearer token provided in `authToken`. - **Authentication**: Bearer token provided in `authToken`.
- **Client**: HTTP Client with timeout. - **Endpoints Used**:
- `/api/v1/repos/{owner}/{repo}/actions/jobs` (Repo scope)
- `/api/v1/orgs/{org}/actions/jobs` (Org scope)
- `/api/v1/users/{user}/repos` + `/api/v1/repos/{owner}/{repo}/actions/jobs` (User scope)
- `/api/v1/admin/actions/jobs` (Global scope)
- **Label Matching**:
- The controller implements logic to check: `Job.Labels ⊆ Runner.EffectiveLabels`.
- Supports both exact matches (`linux`) and schema matches (`ubuntu-latest` matches `ubuntu-latest:docker://...`).
## 7. Security Considerations ## 7. Security Considerations
- **Token Handling**: Registration and Auth tokens are read from Kubernetes Secrets and injected as Environment Variables. They are not stored in plain text in the CR. - **Token Handling**: Tokens are injected via `valueFrom: secretKeyRef` env vars.
- **Privileged Mode**: The default `act_runner` image (dind) requires privileged mode. The Operator creates Jobs with this permission. - **Privileged Mode**: `act_runner` dind mode requires privileged security context.
- **Namespace Isolation**: The Operator should respect RBAC and only operate within allowed namespaces. - **Namespace Isolation**: Controller operates within the namespace of the RunnerGroup.