/* Copyright 2026. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "context" "fmt" "math/rand" "strings" "sync" "time" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" giteav1alpha1 "github.com/bapung/gitea-runner-operator/api/v1alpha1" "github.com/bapung/gitea-runner-operator/internal/gitea" ) // RunnerGroupReconciler reconciles a RunnerGroup object type RunnerGroupReconciler struct { client.Client Scheme *runtime.Scheme GiteaClient gitea.Client SpawnedJobsCache sync.Map } // +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups/status,verbs=get;update;patch // +kubebuilder:rbac:groups=gitea.bpg.pw,resources=runnergroups/finalizers,verbs=update // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *RunnerGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) // 1. Fetch RunnerGroup runnerGroup := &giteav1alpha1.RunnerGroup{} if err := r.Get(ctx, req.NamespacedName, runnerGroup); err != nil { if errors.IsNotFound(err) { // RunnerGroup deleted, nothing to do logger.Info("RunnerGroup not found, ignoring since object must be deleted") return ctrl.Result{}, nil } logger.Error(err, "Failed to get RunnerGroup") return ctrl.Result{}, err } logger.Info("Reconciling RunnerGroup", "name", runnerGroup.Name, "namespace", runnerGroup.Namespace) // 2. List Jobs owned by this RunnerGroup jobList := &batchv1.JobList{} labelSelector := client.MatchingLabels{ "gitea.bpg.pw/runnergroup-name": runnerGroup.Name, } if err := r.List(ctx, jobList, client.InNamespace(runnerGroup.Namespace), labelSelector); err != nil { logger.Error(err, "Failed to list Jobs") return ctrl.Result{}, err } // 3. Update Status - count non-completed jobs activeRunners := 0 for _, job := range jobList.Items { // Job is active if it's not completed (no completion time) if job.Status.CompletionTime == nil { activeRunners++ } } // Update status runnerGroup.Status.ActiveRunners = activeRunners now := metav1.Now() runnerGroup.Status.LastCheckTime = &now if err := r.Status().Update(ctx, runnerGroup); err != nil { logger.Error(err, "Failed to update RunnerGroup status") return ctrl.Result{}, err } logger.Info("Checked active runners", "active", activeRunners, "max", runnerGroup.Spec.MaxActiveRunners) // 4. Capacity Check if activeRunners >= runnerGroup.Spec.MaxActiveRunners { logger.Info("Max active runners reached, skipping scaling", "activeRunners", activeRunners, "maxActiveRunners", runnerGroup.Spec.MaxActiveRunners) return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } // 5. Poll Gitea // Retrieve Auth Token from Secret authToken, err := r.getSecretValue(ctx, runnerGroup.Namespace, runnerGroup.Spec.AuthTokenRef) if err != nil { logger.Error(err, "Failed to get auth token from secret") return ctrl.Result{}, err } logger.Info("Checking Gitea for queued jobs", "url", runnerGroup.Spec.GiteaURL, "scope", runnerGroup.Spec.Scope) // Calculate effective labels (spec labels + defaults) effectiveLabels := r.getEffectiveLabels(runnerGroup.Spec.Labels) // Query for queued workflow runs stats, err := r.GiteaClient.GetRunnerStats( ctx, runnerGroup.Spec.GiteaURL, authToken, runnerGroup.Spec.Scope, runnerGroup.Spec.Org, runnerGroup.Spec.Repo, effectiveLabels, ) if err != nil { logger.Error(err, "Failed to query Gitea for runner stats") return ctrl.Result{RequeueAfter: 10 * time.Second}, err } logger.Info("Gitea query result", "queuedJobs", len(stats.QueuedJobs)) // 6. Scale Up and Cache Management availableSlots := runnerGroup.Spec.MaxActiveRunners - activeRunners // Track current queued IDs for cache cleanup currentQueuedIDs := make(map[int64]bool) // Retrieve Registration Token from Secret (only if we need to spawn) var registrationToken string tokenFetched := false for _, giteaJob := range stats.QueuedJobs { currentQueuedIDs[giteaJob.ID] = true if availableSlots <= 0 { continue } // Check if we already spawned a runner for this job if value, loaded := r.SpawnedJobsCache.Load(giteaJob.ID); loaded { spawnTime := value.(time.Time) if time.Since(spawnTime) < 5*time.Minute { // Already handling this job recently continue } // TTL expired (runner likely failed to start), retry spawning logger.Info("Job stuck in queue for too long, retrying runner spawn", "giteaJobID", giteaJob.ID) } // Need to spawn a runner if !tokenFetched { registrationToken, err = r.getSecretValue(ctx, runnerGroup.Namespace, runnerGroup.Spec.RegistrationTokenRef) if err != nil { logger.Error(err, "Failed to get registration token from secret") return ctrl.Result{}, err } tokenFetched = true } job, err := r.constructJobForRunnerGroup(runnerGroup, registrationToken, effectiveLabels) if err != nil { logger.Error(err, "Failed to construct Job") return ctrl.Result{}, err } if err := r.Create(ctx, job); err != nil { logger.Error(err, "Failed to create Job", "jobName", job.Name) return ctrl.Result{}, err } logger.Info("Created Job for Gitea Run", "jobName", job.Name, "giteaJobID", giteaJob.ID) // Mark as spawned r.SpawnedJobsCache.Store(giteaJob.ID, time.Now()) availableSlots-- } // Cleanup cache: remove jobs that are no longer queued in Gitea r.SpawnedJobsCache.Range(func(key, value any) bool { jobID := key.(int64) if !currentQueuedIDs[jobID] { // Job is no longer in the queue (running, completed, or cancelled) r.SpawnedJobsCache.Delete(key) } return true }) // 7. Requeue for continuous polling return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } // getSecretValue retrieves a value from a secret func (r *RunnerGroupReconciler) getSecretValue(ctx context.Context, namespace string, selector corev1.SecretKeySelector) (string, error) { secret := &corev1.Secret{} secretName := client.ObjectKey{ Namespace: namespace, Name: selector.Name, } if err := r.Get(ctx, secretName, secret); err != nil { return "", fmt.Errorf("failed to get secret %s: %w", selector.Name, err) } value, ok := secret.Data[selector.Key] if !ok { return "", fmt.Errorf("key %s not found in secret %s", selector.Key, selector.Name) } return string(value), nil } // getEffectiveLabels merges spec labels with default labels func (r *RunnerGroupReconciler) getEffectiveLabels(specLabels []string) []string { defaultLabels := []string{ "ubuntu-latest:docker://node:16-bullseye", "ubuntu-22.04:docker://node:16-bullseye", "ubuntu-20.04:docker://node:16-bullseye", "ubuntu-18.04:docker://node:16-buster", } effectiveLabels := make([]string, len(specLabels)) copy(effectiveLabels, specLabels) for _, defaultLabel := range defaultLabels { // Check if this default label key is already overridden in specLabels // defaultLabel format is "key:schema" parts := strings.SplitN(defaultLabel, ":", 2) key := parts[0] found := false for _, specLabel := range specLabels { // Spec label can be "key" or "key:schema" if specLabel == key || strings.HasPrefix(specLabel, key+":") { found = true break } } if !found { effectiveLabels = append(effectiveLabels, defaultLabel) } } return effectiveLabels } // constructJobForRunnerGroup creates a Job object for the RunnerGroup func (r *RunnerGroupReconciler) constructJobForRunnerGroup(runnerGroup *giteav1alpha1.RunnerGroup, registrationToken string, labels []string) (*batchv1.Job, error) { // Generate random suffix for name name := fmt.Sprintf("%s-%s", runnerGroup.Name, randString(8)) // Construct Env Vars envVars := []corev1.EnvVar{ {Name: "GITEA_INSTANCE_URL", Value: runnerGroup.Spec.GiteaURL}, {Name: "GITEA_RUNNER_REGISTRATION_TOKEN", Value: registrationToken}, {Name: "GITEA_RUNNER_EPHEMERAL", Value: "true"}, {Name: "GITEA_RUNNER_NAME", Value: name}, {Name: "DOCKER_HOST", Value: "tcp://localhost:2376"}, {Name: "DOCKER_CERT_PATH", Value: "/certs/client"}, {Name: "DOCKER_TLS_VERIFY", Value: "1"}, } if len(labels) > 0 { labelsStr := strings.Join(labels, ",") envVars = append(envVars, corev1.EnvVar{Name: "GITEA_RUNNER_LABELS", Value: labelsStr}) } // Construct Job job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: runnerGroup.Namespace, Labels: map[string]string{ "app": runnerGroup.Name, "gitea.bpg.pw/runnergroup-name": runnerGroup.Name, "gitea.bpg.pw/managed-by": "gitea-runner-operator", }, }, Spec: batchv1.JobSpec{ TTLSecondsAfterFinished: ptr.To(int32(600)), Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyOnFailure, SecurityContext: &corev1.PodSecurityContext{ FSGroup: ptr.To(int64(1000)), }, Containers: []corev1.Container{ { Name: "runner", Image: "gitea/act_runner:nightly-dind-rootless", ImagePullPolicy: corev1.PullAlways, SecurityContext: &corev1.SecurityContext{ Privileged: ptr.To(true), }, Env: envVars, VolumeMounts: []corev1.VolumeMount{ {Name: "runner-data", MountPath: "/data"}, }, }, }, Volumes: []corev1.Volume{ { Name: "runner-data", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, }, }, }, }, }, }, } // Set Controller Reference if err := ctrl.SetControllerReference(runnerGroup, job, r.Scheme); err != nil { return nil, err } return job, nil } // randString generates a random string of the given length func randString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) b := make([]byte, length) for i := range b { b[i] = charset[seededRand.Intn(len(charset))] } return string(b) } // min returns the minimum of two integers func min(a, b int) int { if a < b { return a } return b } // SetupWithManager sets up the controller with the Manager. func (r *RunnerGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&giteav1alpha1.RunnerGroup{}). Owns(&batchv1.Job{}). Named("runnergroup"). Complete(r) }