Files
gitea-runner-operator/internal/controller/runnergroup_controller.go
2026-01-12 22:41:32 +08:00

362 lines
11 KiB
Go

/*
Copyright 2026.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
giteav1alpha1 "github.com/bapung/gitea-runner-operator/api/v1alpha1"
"github.com/bapung/gitea-runner-operator/internal/gitea"
)
// RunnerGroupReconciler reconciles a RunnerGroup object
type RunnerGroupReconciler struct {
client.Client
Scheme *runtime.Scheme
GiteaClient gitea.Client
SpawnedJobsCache sync.Map
}
// +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *RunnerGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. Fetch RunnerGroup
runnerGroup := &giteav1alpha1.RunnerGroup{}
if err := r.Get(ctx, req.NamespacedName, runnerGroup); err != nil {
if errors.IsNotFound(err) {
// RunnerGroup deleted, nothing to do
logger.Info("RunnerGroup not found, ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get RunnerGroup")
return ctrl.Result{}, err
}
logger.Info("Reconciling RunnerGroup", "name", runnerGroup.Name, "namespace", runnerGroup.Namespace)
// 2. List Jobs owned by this RunnerGroup
jobList := &batchv1.JobList{}
labelSelector := client.MatchingLabels{
"gitea.bpg.pw/runnergroup-name": runnerGroup.Name,
}
if err := r.List(ctx, jobList, client.InNamespace(runnerGroup.Namespace), labelSelector); err != nil {
logger.Error(err, "Failed to list Jobs")
return ctrl.Result{}, err
}
// 3. Update Status - count non-completed jobs
activeRunners := 0
for _, job := range jobList.Items {
// Job is active if it's not completed (no completion time)
if job.Status.CompletionTime == nil {
activeRunners++
}
}
// Update status
runnerGroup.Status.ActiveRunners = activeRunners
now := metav1.Now()
runnerGroup.Status.LastCheckTime = &now
if err := r.Status().Update(ctx, runnerGroup); err != nil {
logger.Error(err, "Failed to update RunnerGroup status")
return ctrl.Result{}, err
}
logger.Info("Checked active runners", "active", activeRunners, "max", runnerGroup.Spec.MaxActiveRunners)
// 4. Capacity Check
if activeRunners >= runnerGroup.Spec.MaxActiveRunners {
logger.Info("Max active runners reached, skipping scaling",
"activeRunners", activeRunners,
"maxActiveRunners", runnerGroup.Spec.MaxActiveRunners)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// 5. Poll Gitea
// Retrieve Auth Token from Secret
authToken, err := r.getSecretValue(ctx, runnerGroup.Namespace, runnerGroup.Spec.AuthTokenRef)
if err != nil {
logger.Error(err, "Failed to get auth token from secret")
return ctrl.Result{}, err
}
logger.Info("Checking Gitea for queued jobs", "url", runnerGroup.Spec.GiteaURL, "scope", runnerGroup.Spec.Scope)
// Calculate effective labels (spec labels + defaults)
effectiveLabels := r.getEffectiveLabels(runnerGroup.Spec.Labels)
// Query for queued workflow runs
stats, err := r.GiteaClient.GetRunnerStats(
ctx,
runnerGroup.Spec.GiteaURL,
authToken,
runnerGroup.Spec.Scope,
runnerGroup.Spec.Org,
runnerGroup.Spec.User,
runnerGroup.Spec.Repo,
effectiveLabels,
)
if err != nil {
logger.Error(err, "Failed to query Gitea for runner stats")
return ctrl.Result{RequeueAfter: 10 * time.Second}, err
}
logger.Info("Gitea query result", "queuedJobs", len(stats.QueuedJobs))
// 6. Scale Up and Cache Management
availableSlots := runnerGroup.Spec.MaxActiveRunners - activeRunners
// Track current queued IDs for cache cleanup
currentQueuedIDs := make(map[int64]bool)
// Retrieve Registration Token from Secret (only if we need to spawn)
var registrationToken string
tokenFetched := false
for _, giteaJob := range stats.QueuedJobs {
currentQueuedIDs[giteaJob.ID] = true
if availableSlots <= 0 {
continue
}
// Check if we already spawned a runner for this job
if value, loaded := r.SpawnedJobsCache.Load(giteaJob.ID); loaded {
spawnTime := value.(time.Time)
if time.Since(spawnTime) < 5*time.Minute {
// Already handling this job recently
continue
}
// TTL expired (runner likely failed to start), retry spawning
logger.Info("Job stuck in queue for too long, retrying runner spawn", "giteaJobID", giteaJob.ID)
}
// Need to spawn a runner
if !tokenFetched {
registrationToken, err = r.getSecretValue(ctx, runnerGroup.Namespace, runnerGroup.Spec.RegistrationTokenRef)
if err != nil {
logger.Error(err, "Failed to get registration token from secret")
return ctrl.Result{}, err
}
tokenFetched = true
}
job, err := r.constructJobForRunnerGroup(runnerGroup, registrationToken, effectiveLabels)
if err != nil {
logger.Error(err, "Failed to construct Job")
return ctrl.Result{}, err
}
if err := r.Create(ctx, job); err != nil {
logger.Error(err, "Failed to create Job", "jobName", job.Name)
return ctrl.Result{}, err
}
logger.Info("Created Job for Gitea Run", "jobName", job.Name, "giteaJobID", giteaJob.ID)
// Mark as spawned
r.SpawnedJobsCache.Store(giteaJob.ID, time.Now())
availableSlots--
}
// Cleanup cache: remove jobs that are no longer queued in Gitea
r.SpawnedJobsCache.Range(func(key, value any) bool {
jobID := key.(int64)
if !currentQueuedIDs[jobID] {
// Job is no longer in the queue (running, completed, or cancelled)
r.SpawnedJobsCache.Delete(key)
}
return true
})
// 7. Requeue for continuous polling
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
// getSecretValue retrieves a value from a secret
func (r *RunnerGroupReconciler) getSecretValue(ctx context.Context, namespace string, selector corev1.SecretKeySelector) (string, error) {
secret := &corev1.Secret{}
secretName := client.ObjectKey{
Namespace: namespace,
Name: selector.Name,
}
if err := r.Get(ctx, secretName, secret); err != nil {
return "", fmt.Errorf("failed to get secret %s: %w", selector.Name, err)
}
value, ok := secret.Data[selector.Key]
if !ok {
return "", fmt.Errorf("key %s not found in secret %s", selector.Key, selector.Name)
}
return string(value), nil
}
// getEffectiveLabels merges spec labels with default labels
func (r *RunnerGroupReconciler) getEffectiveLabels(specLabels []string) []string {
defaultLabels := []string{
"ubuntu-latest:docker://node:16-bullseye",
"ubuntu-22.04:docker://node:16-bullseye",
"ubuntu-20.04:docker://node:16-bullseye",
"ubuntu-18.04:docker://node:16-buster",
}
effectiveLabels := make([]string, len(specLabels))
copy(effectiveLabels, specLabels)
for _, defaultLabel := range defaultLabels {
// Check if this default label key is already overridden in specLabels
// defaultLabel format is "key:schema"
parts := strings.SplitN(defaultLabel, ":", 2)
key := parts[0]
found := false
for _, specLabel := range specLabels {
// Spec label can be "key" or "key:schema"
if specLabel == key || strings.HasPrefix(specLabel, key+":") {
found = true
break
}
}
if !found {
effectiveLabels = append(effectiveLabels, defaultLabel)
}
}
return effectiveLabels
}
// constructJobForRunnerGroup creates a Job object for the RunnerGroup
func (r *RunnerGroupReconciler) constructJobForRunnerGroup(runnerGroup *giteav1alpha1.RunnerGroup, registrationToken string, labels []string) (*batchv1.Job, error) {
// Generate random suffix for name
name := fmt.Sprintf("%s-%s", runnerGroup.Name, randString(8))
// Construct Env Vars
envVars := []corev1.EnvVar{
{Name: "GITEA_INSTANCE_URL", Value: runnerGroup.Spec.GiteaURL},
{Name: "GITEA_RUNNER_REGISTRATION_TOKEN", Value: registrationToken},
{Name: "GITEA_RUNNER_EPHEMERAL", Value: "true"},
{Name: "GITEA_RUNNER_NAME", Value: name},
{Name: "DOCKER_HOST", Value: "tcp://localhost:2376"},
{Name: "DOCKER_CERT_PATH", Value: "/certs/client"},
{Name: "DOCKER_TLS_VERIFY", Value: "1"},
}
if len(labels) > 0 {
labelsStr := strings.Join(labels, ",")
envVars = append(envVars, corev1.EnvVar{Name: "GITEA_RUNNER_LABELS", Value: labelsStr})
}
// Construct Job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: runnerGroup.Namespace,
Labels: map[string]string{
"app": runnerGroup.Name,
"gitea.bpg.pw/runnergroup-name": runnerGroup.Name,
"gitea.bpg.pw/managed-by": "gitea-runner-operator",
},
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: ptr.To(int32(600)),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
SecurityContext: &corev1.PodSecurityContext{
FSGroup: ptr.To(int64(1000)),
},
Containers: []corev1.Container{
{
Name: "runner",
Image: "gitea/act_runner:nightly-dind-rootless",
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
Privileged: ptr.To(true),
},
Env: envVars,
VolumeMounts: []corev1.VolumeMount{
{Name: "runner-data", MountPath: "/data"},
},
},
},
Volumes: []corev1.Volume{
{
Name: "runner-data",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
},
},
}
// Set Controller Reference
if err := ctrl.SetControllerReference(runnerGroup, job, r.Scheme); err != nil {
return nil, err
}
return job, nil
}
// randString generates a random string of the given length
func randString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}
// SetupWithManager sets up the controller with the Manager.
func (r *RunnerGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&giteav1alpha1.RunnerGroup{}).
Owns(&batchv1.Job{}).
Named("runnergroup").
Complete(r)
}