commit 3e48c911c81d7ff5cbdf55ad703f3f954cd055a2 Author: ai_approver Date: Sun May 3 01:25:50 2026 +0000 feat: version checker script diff --git a/check_versions.py b/check_versions.py new file mode 100644 index 0000000..477b39c --- /dev/null +++ b/check_versions.py @@ -0,0 +1,410 @@ +#!/usr/bin/env python3 +""" +k8s-version-tracker: Weekly Kubernetes image version checker and Mattermost notifier. + +Queries all running pods in the cluster, checks upstream registries for newer versions, +optionally mirrors updated images to the local Zot registry, and sends a Mattermost +notification with the weekly report. +""" + +import os +import re +import json +import logging +import subprocess +from dataclasses import dataclass, field +from typing import Optional +from datetime import datetime, timezone + +import requests +from kubernetes import client, config +from packaging.version import Version, InvalidVersion + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +# ───────────────────────────────────────────────────────────────────────────── +# Configuration +# ───────────────────────────────────────────────────────────────────────────── + +LOCAL_REGISTRIES = { + "registry.storedbox.net", + "192.168.87.31:5000", + "192.168.87.31", + "192.168.86.200", +} + +# Tags that float (can't track by semver — compare digest instead) +FLOATING_TAGS = { + "latest", "main", "master", "release", "stable", "edge", + "nightly", "dev", "snapshot", "lts", +} + +# Tags that encode non-comparable info (skip version check) +SKIP_TAG_PATTERNS = [ + r"^sha256:", # digest refs + r"-jdk\d+", # e.g. lts-jdk17 + r"^pg\d+-", # e.g. pg14-v0.2.0 + r"^stable-\d+$", # jitsi build numbers — handled separately + r"fbde4d5e", # devtron commit hashes +] + + +@dataclass +class ImageInfo: + original: str # raw image string from pod spec + registry: str + namespace: str # e.g. 'prom', 'library', 'grafana' + repo: str # e.g. 'prometheus' + tag: str + pods: list[str] = field(default_factory=list) + + @property + def display_name(self) -> str: + if self.registry == "docker.io": + if self.namespace == "library": + return f"{self.repo}:{self.tag}" + return f"{self.namespace}/{self.repo}:{self.tag}" + return f"{self.registry}/{self.namespace}/{self.repo}:{self.tag}" + + @property + def is_local(self) -> bool: + return self.registry in LOCAL_REGISTRIES + + @property + def is_floating(self) -> bool: + return self.tag.lower() in FLOATING_TAGS + + @property + def skip_check(self) -> bool: + return any(re.search(p, self.tag) for p in SKIP_TAG_PATTERNS) + + +# ───────────────────────────────────────────────────────────────────────────── +# Image reference parsing +# ───────────────────────────────────────────────────────────────────────────── + +def parse_image_ref(image: str) -> ImageInfo: + """Parse a full image reference into structured components.""" + ref = image.split("@")[0] # strip digest + tag = "latest" + if ":" in ref.split("/")[-1]: + ref, tag = ref.rsplit(":", 1) + + parts = ref.split("/") + + if len(parts) == 1: + return ImageInfo(image, "docker.io", "library", parts[0], tag) + + first = parts[0] + is_registry = "." in first or ":" in first or first == "localhost" + + if not is_registry: + # e.g. prom/prometheus, n8nio/n8n + return ImageInfo(image, "docker.io", parts[0], "/".join(parts[1:]), tag) + + registry = first + remainder = parts[1:] + if not remainder: + return ImageInfo(image, registry, "", "", tag) + + # registry/namespace/repo or registry/repo + repo = remainder[-1] + namespace = "/".join(remainder[:-1]) if len(remainder) > 1 else "" + return ImageInfo(image, registry, namespace, repo, tag) + + +# ───────────────────────────────────────────────────────────────────────────── +# Registry version checkers +# ───────────────────────────────────────────────────────────────────────────── + +def _semver_tags(tags: list[str], current_tag: str) -> Optional[str]: + """From a list of tags, return the latest stable semver tag comparable to current.""" + norm = current_tag.lstrip("v") + has_v_prefix = current_tag.startswith("v") + + # Detect suffix pattern (e.g. '-alpine', '-amd64') + suffix_match = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", norm) + suffix = suffix_match.group(1) if suffix_match else "" + if suffix: + norm = norm[: -len(suffix)] + + stable = [] + for t in tags: + t_norm = t.lstrip("v") + t_suffix = "" + s = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", t_norm) + if s: + t_suffix = s.group(1) + t_norm = t_norm[: -len(t_suffix)] + + if t_suffix != suffix: + continue + if re.search(r"(alpha|beta|rc|pre|dev|nightly|snapshot)", t, re.I): + continue + try: + Version(t_norm) + stable.append((Version(t_norm), t)) + except InvalidVersion: + continue + + if not stable: + return None + + _, latest_tag = max(stable, key=lambda x: x[0]) + return latest_tag + + +def check_dockerhub(namespace: str, repo: str, tag: str) -> Optional[str]: + """Return latest stable tag from Docker Hub.""" + ns = namespace if namespace and namespace != "library" else "library" + url = f"https://hub.docker.com/v2/repositories/{ns}/{repo}/tags?page_size=50&ordering=last_updated" + try: + r = requests.get(url, timeout=15) + r.raise_for_status() + tags = [t["name"] for t in r.json().get("results", [])] + return _semver_tags(tags, tag) + except Exception as e: + logger.warning("DockerHub %s/%s: %s", ns, repo, e) + return None + + +def check_ghcr(namespace: str, repo: str, tag: str) -> Optional[str]: + """Return latest stable tag from GitHub Container Registry.""" + token_b64 = "e30K" # anonymous token for public images + headers = {"Authorization": f"Bearer {token_b64}"} + name = f"{namespace}/{repo}" if namespace else repo + url = f"https://ghcr.io/v2/{name}/tags/list" + try: + r = requests.get(url, headers=headers, timeout=15) + if r.status_code == 401: + # Try unauthenticated + r = requests.get(url, timeout=15) + r.raise_for_status() + tags = r.json().get("tags", []) + return _semver_tags(tags, tag) + except Exception as e: + logger.warning("GHCR %s/%s: %s", namespace, repo, e) + return None + + +def check_quay(namespace: str, repo: str, tag: str) -> Optional[str]: + """Return latest stable tag from quay.io.""" + name = f"{namespace}/{repo}" if namespace else repo + url = f"https://quay.io/api/v1/repository/{name}/tag/?limit=50&onlyActiveTags=true" + try: + r = requests.get(url, timeout=15) + r.raise_for_status() + tags = [t["name"] for t in r.json().get("tags", [])] + return _semver_tags(tags, tag) + except Exception as e: + logger.warning("quay.io %s/%s: %s", namespace, repo, e) + return None + + +def check_linuxserver(repo: str, tag: str) -> Optional[str]: + """Check lscr.io/linuxserver images (backed by GHCR).""" + return check_ghcr("linuxserver", repo, tag) + + +def get_latest_version(img: ImageInfo) -> Optional[str]: + """Dispatch version check to the correct registry handler.""" + reg = img.registry.lower() + + if "docker.io" in reg: + return check_dockerhub(img.namespace, img.repo, img.tag) + if "ghcr.io" in reg: + return check_ghcr(img.namespace, img.repo, img.tag) + if "quay.io" in reg: + return check_quay(img.namespace, img.repo, img.tag) + if "lscr.io" in reg: + return check_linuxserver(img.repo, img.tag) + + logger.debug("No handler for registry %s, skipping", img.registry) + return None + + +# ───────────────────────────────────────────────────────────────────────────── +# Cluster image inventory +# ───────────────────────────────────────────────────────────────────────────── + +def get_cluster_images() -> dict[str, ImageInfo]: + """Return {image_ref: ImageInfo} for all running pods in the cluster.""" + try: + config.load_incluster_config() + except config.ConfigException: + config.load_kube_config() + + v1 = client.CoreV1Api() + pods = v1.list_pod_for_all_namespaces(watch=False) + + images: dict[str, ImageInfo] = {} + for pod in pods.items: + ns = pod.metadata.namespace + pod_name = pod.metadata.name + for container in (pod.spec.containers or []): + ref = container.image + if not ref: + continue + loc = f"{ns}/{pod_name}" + if ref in images: + images[ref].pods.append(loc) + else: + info = parse_image_ref(ref) + info.pods.append(loc) + images[ref] = info + + return images + + +# ───────────────────────────────────────────────────────────────────────────── +# Image mirroring to local Zot registry +# ───────────────────────────────────────────────────────────────────────────── + +def mirror_to_local(image: str, local_registry: str) -> bool: + """ + Copy an image from upstream to the local Zot registry using skopeo. + Returns True on success. + """ + img = parse_image_ref(image) + dest = f"{local_registry}/{img.namespace}/{img.repo}:{img.tag}".lstrip("/") + dest = dest.replace("//", "/") + cmd = [ + "skopeo", "copy", + "--dest-tls-verify=false", + f"docker://{image}", + f"docker://{dest}", + ] + logger.info("Mirroring %s → %s", image, dest) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + if result.returncode != 0: + logger.error("Mirror failed for %s: %s", image, result.stderr) + return False + return True + + +# ───────────────────────────────────────────────────────────────────────────── +# Mattermost notification +# ───────────────────────────────────────────────────────────────────────────── + +def build_report(results: list[dict]) -> str: + """Build a Mattermost markdown report from version check results.""" + now = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + needs_update = [r for r in results if r.get("status") == "outdated"] + floating = [r for r in results if r.get("status") == "floating"] + up_to_date = [r for r in results if r.get("status") == "current"] + skipped = [r for r in results if r.get("status") == "skipped"] + + lines = [f"## :kubernetes: K8s Weekly Version Report — {now}\n"] + + if needs_update: + lines.append(f"### :red_circle: Updates Available ({len(needs_update)})\n") + lines.append("| Namespace | Image | Running | Latest |") + lines.append("|---|---|---|---|") + for r in sorted(needs_update, key=lambda x: x["image"]): + pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]})) + lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` | `{r['latest']}` |") + lines.append("") + + if floating: + lines.append(f"### :warning: Floating Tags — digest-tracked ({len(floating)})\n") + lines.append("| Namespace | Image | Tag |") + lines.append("|---|---|---|") + for r in sorted(floating, key=lambda x: x["image"]): + pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]})) + lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |") + lines.append("") + + if up_to_date: + lines.append(f"### :white_check_mark: Up to Date ({len(up_to_date)})\n") + lines.append("| Namespace | Image | Version |") + lines.append("|---|---|---|") + for r in sorted(up_to_date, key=lambda x: x["image"]): + pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]})) + lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |") + lines.append("") + + if skipped: + lines.append( + f"*{len(skipped)} images skipped (local registry, pinned commits, or complex tags)*\n" + ) + + return "\n".join(lines) + + +def send_mattermost(report: str, webhook_url: str) -> None: + payload = {"text": report, "username": "k8s-version-tracker", "icon_emoji": ":kubernetes:"} + r = requests.post(webhook_url, json=payload, timeout=15) + r.raise_for_status() + logger.info("Mattermost notification sent") + + +# ───────────────────────────────────────────────────────────────────────────── +# Main +# ───────────────────────────────────────────────────────────────────────────── + +def main() -> None: + webhook_url = os.environ["MATTERMOST_WEBHOOK_URL"] + local_registry = os.environ.get("LOCAL_REGISTRY", "registry.storedbox.net") + do_mirror = os.environ.get("MIRROR_IMAGES", "false").lower() == "true" + + logger.info("Gathering cluster image inventory …") + all_images = get_cluster_images() + logger.info("Found %d unique image references", len(all_images)) + + results = [] + mirrored = [] + + for ref, img in all_images.items(): + entry = { + "image": ref, + "image_short": img.display_name, + "current": img.tag, + "latest": None, + "pods": img.pods, + "status": "skipped", + } + + if img.is_local: + entry["status"] = "skipped" + elif img.skip_check: + entry["status"] = "skipped" + elif img.is_floating: + entry["status"] = "floating" + else: + latest = get_latest_version(img) + if latest is None: + entry["status"] = "skipped" + else: + entry["latest"] = latest + try: + current_v = Version(img.tag.lstrip("v")) + latest_v = Version(latest.lstrip("v")) + entry["status"] = "outdated" if latest_v > current_v else "current" + except InvalidVersion: + entry["status"] = "current" if img.tag == latest else "outdated" + + if do_mirror and entry["status"] == "outdated" and entry["latest"]: + new_ref = ref.rsplit(":", 1)[0] + ":" + entry["latest"] + if mirror_to_local(new_ref, local_registry): + mirrored.append(new_ref) + + results.append(entry) + + report = build_report(results) + + if mirrored: + report += f"\n\n*Mirrored {len(mirrored)} updated images to `{local_registry}`*" + + logger.info("Sending Mattermost notification …") + send_mattermost(report, webhook_url) + logger.info("Done. %d outdated, %d floating, %d current, %d skipped", + sum(1 for r in results if r["status"] == "outdated"), + sum(1 for r in results if r["status"] == "floating"), + sum(1 for r in results if r["status"] == "current"), + sum(1 for r in results if r["status"] == "skipped")) + + +if __name__ == "__main__": + main()