#!/usr/bin/env python3 """ k8s-version-tracker: Weekly Kubernetes image version checker and Mattermost notifier. Queries all running pods in the cluster, checks upstream registries for newer versions, optionally mirrors updated images to the local Zot registry, and sends a Mattermost notification with the weekly report. """ import os import re import json import logging import subprocess from dataclasses import dataclass, field from typing import Optional from datetime import datetime, timezone import requests from kubernetes import client, config from packaging.version import Version, InvalidVersion logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger(__name__) # ───────────────────────────────────────────────────────────────────────────── # Configuration # ───────────────────────────────────────────────────────────────────────────── LOCAL_REGISTRIES = { "registry.storedbox.net", "192.168.87.31:5000", "192.168.87.31", "192.168.86.200", } # Tags that float (can't track by semver — compare digest instead) FLOATING_TAGS = { "latest", "main", "master", "release", "stable", "edge", "nightly", "dev", "snapshot", "lts", } # Tags that encode non-comparable info (skip version check) SKIP_TAG_PATTERNS = [ r"^sha256:", # digest refs r"-jdk\d+", # e.g. lts-jdk17 r"^pg\d+-", # e.g. pg14-v0.2.0 r"^stable-\d+$", # jitsi build numbers — handled separately r"fbde4d5e", # devtron commit hashes ] @dataclass class ImageInfo: original: str # raw image string from pod spec registry: str namespace: str # e.g. 'prom', 'library', 'grafana' repo: str # e.g. 'prometheus' tag: str pods: list[str] = field(default_factory=list) @property def display_name(self) -> str: if self.registry == "docker.io": if self.namespace == "library": return f"{self.repo}:{self.tag}" return f"{self.namespace}/{self.repo}:{self.tag}" return f"{self.registry}/{self.namespace}/{self.repo}:{self.tag}" @property def is_local(self) -> bool: return self.registry in LOCAL_REGISTRIES @property def is_floating(self) -> bool: return self.tag.lower() in FLOATING_TAGS @property def skip_check(self) -> bool: return any(re.search(p, self.tag) for p in SKIP_TAG_PATTERNS) # ───────────────────────────────────────────────────────────────────────────── # Image reference parsing # ───────────────────────────────────────────────────────────────────────────── def parse_image_ref(image: str) -> ImageInfo: """Parse a full image reference into structured components.""" ref = image.split("@")[0] # strip digest tag = "latest" if ":" in ref.split("/")[-1]: ref, tag = ref.rsplit(":", 1) parts = ref.split("/") if len(parts) == 1: return ImageInfo(image, "docker.io", "library", parts[0], tag) first = parts[0] is_registry = "." in first or ":" in first or first == "localhost" if not is_registry: # e.g. prom/prometheus, n8nio/n8n return ImageInfo(image, "docker.io", parts[0], "/".join(parts[1:]), tag) registry = first remainder = parts[1:] if not remainder: return ImageInfo(image, registry, "", "", tag) # registry/namespace/repo or registry/repo repo = remainder[-1] namespace = "/".join(remainder[:-1]) if len(remainder) > 1 else "" return ImageInfo(image, registry, namespace, repo, tag) # ───────────────────────────────────────────────────────────────────────────── # Registry version checkers # ───────────────────────────────────────────────────────────────────────────── def _semver_tags(tags: list[str], current_tag: str) -> Optional[str]: """From a list of tags, return the latest stable semver tag comparable to current.""" norm = current_tag.lstrip("v") has_v_prefix = current_tag.startswith("v") # Detect suffix pattern (e.g. '-alpine', '-amd64') suffix_match = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", norm) suffix = suffix_match.group(1) if suffix_match else "" if suffix: norm = norm[: -len(suffix)] stable = [] for t in tags: t_norm = t.lstrip("v") t_suffix = "" s = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", t_norm) if s: t_suffix = s.group(1) t_norm = t_norm[: -len(t_suffix)] if t_suffix != suffix: continue if re.search(r"(alpha|beta|rc|pre|dev|nightly|snapshot)", t, re.I): continue try: Version(t_norm) stable.append((Version(t_norm), t)) except InvalidVersion: continue if not stable: return None _, latest_tag = max(stable, key=lambda x: x[0]) return latest_tag def check_dockerhub(namespace: str, repo: str, tag: str) -> Optional[str]: """Return latest stable tag from Docker Hub.""" ns = namespace if namespace and namespace != "library" else "library" url = f"https://hub.docker.com/v2/repositories/{ns}/{repo}/tags?page_size=50&ordering=last_updated" try: r = requests.get(url, timeout=15) r.raise_for_status() tags = [t["name"] for t in r.json().get("results", [])] return _semver_tags(tags, tag) except Exception as e: logger.warning("DockerHub %s/%s: %s", ns, repo, e) return None def check_ghcr(namespace: str, repo: str, tag: str) -> Optional[str]: """Return latest stable tag from GitHub Container Registry.""" token_b64 = "e30K" # anonymous token for public images headers = {"Authorization": f"Bearer {token_b64}"} name = f"{namespace}/{repo}" if namespace else repo url = f"https://ghcr.io/v2/{name}/tags/list" try: r = requests.get(url, headers=headers, timeout=15) if r.status_code == 401: # Try unauthenticated r = requests.get(url, timeout=15) r.raise_for_status() tags = r.json().get("tags", []) return _semver_tags(tags, tag) except Exception as e: logger.warning("GHCR %s/%s: %s", namespace, repo, e) return None def check_quay(namespace: str, repo: str, tag: str) -> Optional[str]: """Return latest stable tag from quay.io.""" name = f"{namespace}/{repo}" if namespace else repo url = f"https://quay.io/api/v1/repository/{name}/tag/?limit=50&onlyActiveTags=true" try: r = requests.get(url, timeout=15) r.raise_for_status() tags = [t["name"] for t in r.json().get("tags", [])] return _semver_tags(tags, tag) except Exception as e: logger.warning("quay.io %s/%s: %s", namespace, repo, e) return None def check_linuxserver(repo: str, tag: str) -> Optional[str]: """Check lscr.io/linuxserver images (backed by GHCR).""" return check_ghcr("linuxserver", repo, tag) def get_latest_version(img: ImageInfo) -> Optional[str]: """Dispatch version check to the correct registry handler.""" reg = img.registry.lower() if "docker.io" in reg: return check_dockerhub(img.namespace, img.repo, img.tag) if "ghcr.io" in reg: return check_ghcr(img.namespace, img.repo, img.tag) if "quay.io" in reg: return check_quay(img.namespace, img.repo, img.tag) if "lscr.io" in reg: return check_linuxserver(img.repo, img.tag) logger.debug("No handler for registry %s, skipping", img.registry) return None # ───────────────────────────────────────────────────────────────────────────── # Cluster image inventory # ───────────────────────────────────────────────────────────────────────────── def get_cluster_images() -> dict[str, ImageInfo]: """Return {image_ref: ImageInfo} for all running pods in the cluster.""" try: config.load_incluster_config() except config.ConfigException: config.load_kube_config() v1 = client.CoreV1Api() pods = v1.list_pod_for_all_namespaces(watch=False) images: dict[str, ImageInfo] = {} for pod in pods.items: ns = pod.metadata.namespace pod_name = pod.metadata.name for container in (pod.spec.containers or []): ref = container.image if not ref: continue loc = f"{ns}/{pod_name}" if ref in images: images[ref].pods.append(loc) else: info = parse_image_ref(ref) info.pods.append(loc) images[ref] = info return images # ───────────────────────────────────────────────────────────────────────────── # Image mirroring to local Zot registry # ───────────────────────────────────────────────────────────────────────────── def mirror_to_local(image: str, local_registry: str) -> bool: """ Copy an image from upstream to the local Zot registry using skopeo. Returns True on success. """ img = parse_image_ref(image) dest = f"{local_registry}/{img.namespace}/{img.repo}:{img.tag}".lstrip("/") dest = dest.replace("//", "/") cmd = [ "skopeo", "copy", "--dest-tls-verify=false", f"docker://{image}", f"docker://{dest}", ] logger.info("Mirroring %s → %s", image, dest) result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode != 0: logger.error("Mirror failed for %s: %s", image, result.stderr) return False return True # ───────────────────────────────────────────────────────────────────────────── # Mattermost notification # ───────────────────────────────────────────────────────────────────────────── def build_report(results: list[dict]) -> str: """Build a Mattermost markdown report from version check results.""" now = datetime.now(timezone.utc).strftime("%Y-%m-%d") needs_update = [r for r in results if r.get("status") == "outdated"] floating = [r for r in results if r.get("status") == "floating"] up_to_date = [r for r in results if r.get("status") == "current"] skipped = [r for r in results if r.get("status") == "skipped"] lines = [f"## :kubernetes: K8s Weekly Version Report — {now}\n"] if needs_update: lines.append(f"### :red_circle: Updates Available ({len(needs_update)})\n") lines.append("| Namespace | Image | Running | Latest |") lines.append("|---|---|---|---|") for r in sorted(needs_update, key=lambda x: x["image"]): pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]})) lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` | `{r['latest']}` |") lines.append("") if floating: lines.append(f"### :warning: Floating Tags — digest-tracked ({len(floating)})\n") lines.append("| Namespace | Image | Tag |") lines.append("|---|---|---|") for r in sorted(floating, key=lambda x: x["image"]): pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]})) lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |") lines.append("") if up_to_date: lines.append(f"### :white_check_mark: Up to Date ({len(up_to_date)})\n") lines.append("| Namespace | Image | Version |") lines.append("|---|---|---|") for r in sorted(up_to_date, key=lambda x: x["image"]): pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]})) lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |") lines.append("") if skipped: lines.append( f"*{len(skipped)} images skipped (local registry, pinned commits, or complex tags)*\n" ) return "\n".join(lines) def send_mattermost(report: str, webhook_url: str) -> None: payload = {"text": report, "username": "k8s-version-tracker", "icon_emoji": ":kubernetes:"} r = requests.post(webhook_url, json=payload, timeout=15) r.raise_for_status() logger.info("Mattermost notification sent") # ───────────────────────────────────────────────────────────────────────────── # Main # ───────────────────────────────────────────────────────────────────────────── def main() -> None: webhook_url = os.environ["MATTERMOST_WEBHOOK_URL"] local_registry = os.environ.get("LOCAL_REGISTRY", "registry.storedbox.net") do_mirror = os.environ.get("MIRROR_IMAGES", "false").lower() == "true" logger.info("Gathering cluster image inventory …") all_images = get_cluster_images() logger.info("Found %d unique image references", len(all_images)) results = [] for ref, img in all_images.items(): entry = { "image": ref, "image_short": img.display_name, "current": img.tag, "latest": None, "pods": img.pods, "status": "skipped", } if img.is_local: entry["status"] = "skipped" elif img.skip_check: entry["status"] = "skipped" elif img.is_floating: entry["status"] = "floating" else: latest = get_latest_version(img) if latest is None: entry["status"] = "skipped" else: entry["latest"] = latest try: current_v = Version(img.tag.lstrip("v")) latest_v = Version(latest.lstrip("v")) entry["status"] = "outdated" if latest_v > current_v else "current" except InvalidVersion: entry["status"] = "current" if img.tag == latest else "outdated" results.append(entry) # Send report immediately after version checks, before mirroring report = build_report(results) logger.info("Sending Mattermost notification …") send_mattermost(report, webhook_url) logger.info("Done. %d outdated, %d floating, %d current, %d skipped", sum(1 for r in results if r["status"] == "outdated"), sum(1 for r in results if r["status"] == "floating"), sum(1 for r in results if r["status"] == "current"), sum(1 for r in results if r["status"] == "skipped")) # Mirror outdated images after report is sent if do_mirror: mirrored = [] for entry in results: if entry["status"] == "outdated" and entry["latest"]: ref = entry["image"] new_ref = ref.rsplit(":", 1)[0] + ":" + entry["latest"] if mirror_to_local(new_ref, local_registry): mirrored.append(new_ref) if mirrored: logger.info("Mirrored %d updated images to %s", len(mirrored), local_registry) if __name__ == "__main__": main()