Files
k8s-version-tracker/check_versions.py

411 lines
16 KiB
Python

#!/usr/bin/env python3
"""
k8s-version-tracker: Weekly Kubernetes image version checker and Mattermost notifier.
Queries all running pods in the cluster, checks upstream registries for newer versions,
optionally mirrors updated images to the local Zot registry, and sends a Mattermost
notification with the weekly report.
"""
import os
import re
import json
import logging
import subprocess
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime, timezone
import requests
from kubernetes import client, config
from packaging.version import Version, InvalidVersion
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Configuration
# ─────────────────────────────────────────────────────────────────────────────
LOCAL_REGISTRIES = {
"registry.storedbox.net",
"192.168.87.31:5000",
"192.168.87.31",
"192.168.86.200",
}
# Tags that float (can't track by semver — compare digest instead)
FLOATING_TAGS = {
"latest", "main", "master", "release", "stable", "edge",
"nightly", "dev", "snapshot", "lts",
}
# Tags that encode non-comparable info (skip version check)
SKIP_TAG_PATTERNS = [
r"^sha256:", # digest refs
r"-jdk\d+", # e.g. lts-jdk17
r"^pg\d+-", # e.g. pg14-v0.2.0
r"^stable-\d+$", # jitsi build numbers — handled separately
r"fbde4d5e", # devtron commit hashes
]
@dataclass
class ImageInfo:
original: str # raw image string from pod spec
registry: str
namespace: str # e.g. 'prom', 'library', 'grafana'
repo: str # e.g. 'prometheus'
tag: str
pods: list[str] = field(default_factory=list)
@property
def display_name(self) -> str:
if self.registry == "docker.io":
if self.namespace == "library":
return f"{self.repo}:{self.tag}"
return f"{self.namespace}/{self.repo}:{self.tag}"
return f"{self.registry}/{self.namespace}/{self.repo}:{self.tag}"
@property
def is_local(self) -> bool:
return self.registry in LOCAL_REGISTRIES
@property
def is_floating(self) -> bool:
return self.tag.lower() in FLOATING_TAGS
@property
def skip_check(self) -> bool:
return any(re.search(p, self.tag) for p in SKIP_TAG_PATTERNS)
# ─────────────────────────────────────────────────────────────────────────────
# Image reference parsing
# ─────────────────────────────────────────────────────────────────────────────
def parse_image_ref(image: str) -> ImageInfo:
"""Parse a full image reference into structured components."""
ref = image.split("@")[0] # strip digest
tag = "latest"
if ":" in ref.split("/")[-1]:
ref, tag = ref.rsplit(":", 1)
parts = ref.split("/")
if len(parts) == 1:
return ImageInfo(image, "docker.io", "library", parts[0], tag)
first = parts[0]
is_registry = "." in first or ":" in first or first == "localhost"
if not is_registry:
# e.g. prom/prometheus, n8nio/n8n
return ImageInfo(image, "docker.io", parts[0], "/".join(parts[1:]), tag)
registry = first
remainder = parts[1:]
if not remainder:
return ImageInfo(image, registry, "", "", tag)
# registry/namespace/repo or registry/repo
repo = remainder[-1]
namespace = "/".join(remainder[:-1]) if len(remainder) > 1 else ""
return ImageInfo(image, registry, namespace, repo, tag)
# ─────────────────────────────────────────────────────────────────────────────
# Registry version checkers
# ─────────────────────────────────────────────────────────────────────────────
def _semver_tags(tags: list[str], current_tag: str) -> Optional[str]:
"""From a list of tags, return the latest stable semver tag comparable to current."""
norm = current_tag.lstrip("v")
has_v_prefix = current_tag.startswith("v")
# Detect suffix pattern (e.g. '-alpine', '-amd64')
suffix_match = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", norm)
suffix = suffix_match.group(1) if suffix_match else ""
if suffix:
norm = norm[: -len(suffix)]
stable = []
for t in tags:
t_norm = t.lstrip("v")
t_suffix = ""
s = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", t_norm)
if s:
t_suffix = s.group(1)
t_norm = t_norm[: -len(t_suffix)]
if t_suffix != suffix:
continue
if re.search(r"(alpha|beta|rc|pre|dev|nightly|snapshot)", t, re.I):
continue
try:
Version(t_norm)
stable.append((Version(t_norm), t))
except InvalidVersion:
continue
if not stable:
return None
_, latest_tag = max(stable, key=lambda x: x[0])
return latest_tag
def check_dockerhub(namespace: str, repo: str, tag: str) -> Optional[str]:
"""Return latest stable tag from Docker Hub."""
ns = namespace if namespace and namespace != "library" else "library"
url = f"https://hub.docker.com/v2/repositories/{ns}/{repo}/tags?page_size=50&ordering=last_updated"
try:
r = requests.get(url, timeout=15)
r.raise_for_status()
tags = [t["name"] for t in r.json().get("results", [])]
return _semver_tags(tags, tag)
except Exception as e:
logger.warning("DockerHub %s/%s: %s", ns, repo, e)
return None
def check_ghcr(namespace: str, repo: str, tag: str) -> Optional[str]:
"""Return latest stable tag from GitHub Container Registry."""
token_b64 = "e30K" # anonymous token for public images
headers = {"Authorization": f"Bearer {token_b64}"}
name = f"{namespace}/{repo}" if namespace else repo
url = f"https://ghcr.io/v2/{name}/tags/list"
try:
r = requests.get(url, headers=headers, timeout=15)
if r.status_code == 401:
# Try unauthenticated
r = requests.get(url, timeout=15)
r.raise_for_status()
tags = r.json().get("tags", [])
return _semver_tags(tags, tag)
except Exception as e:
logger.warning("GHCR %s/%s: %s", namespace, repo, e)
return None
def check_quay(namespace: str, repo: str, tag: str) -> Optional[str]:
"""Return latest stable tag from quay.io."""
name = f"{namespace}/{repo}" if namespace else repo
url = f"https://quay.io/api/v1/repository/{name}/tag/?limit=50&onlyActiveTags=true"
try:
r = requests.get(url, timeout=15)
r.raise_for_status()
tags = [t["name"] for t in r.json().get("tags", [])]
return _semver_tags(tags, tag)
except Exception as e:
logger.warning("quay.io %s/%s: %s", namespace, repo, e)
return None
def check_linuxserver(repo: str, tag: str) -> Optional[str]:
"""Check lscr.io/linuxserver images (backed by GHCR)."""
return check_ghcr("linuxserver", repo, tag)
def get_latest_version(img: ImageInfo) -> Optional[str]:
"""Dispatch version check to the correct registry handler."""
reg = img.registry.lower()
if "docker.io" in reg:
return check_dockerhub(img.namespace, img.repo, img.tag)
if "ghcr.io" in reg:
return check_ghcr(img.namespace, img.repo, img.tag)
if "quay.io" in reg:
return check_quay(img.namespace, img.repo, img.tag)
if "lscr.io" in reg:
return check_linuxserver(img.repo, img.tag)
logger.debug("No handler for registry %s, skipping", img.registry)
return None
# ─────────────────────────────────────────────────────────────────────────────
# Cluster image inventory
# ─────────────────────────────────────────────────────────────────────────────
def get_cluster_images() -> dict[str, ImageInfo]:
"""Return {image_ref: ImageInfo} for all running pods in the cluster."""
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
v1 = client.CoreV1Api()
pods = v1.list_pod_for_all_namespaces(watch=False)
images: dict[str, ImageInfo] = {}
for pod in pods.items:
ns = pod.metadata.namespace
pod_name = pod.metadata.name
for container in (pod.spec.containers or []):
ref = container.image
if not ref:
continue
loc = f"{ns}/{pod_name}"
if ref in images:
images[ref].pods.append(loc)
else:
info = parse_image_ref(ref)
info.pods.append(loc)
images[ref] = info
return images
# ─────────────────────────────────────────────────────────────────────────────
# Image mirroring to local Zot registry
# ─────────────────────────────────────────────────────────────────────────────
def mirror_to_local(image: str, local_registry: str) -> bool:
"""
Copy an image from upstream to the local Zot registry using skopeo.
Returns True on success.
"""
img = parse_image_ref(image)
dest = f"{local_registry}/{img.namespace}/{img.repo}:{img.tag}".lstrip("/")
dest = dest.replace("//", "/")
cmd = [
"skopeo", "copy",
"--dest-tls-verify=false",
f"docker://{image}",
f"docker://{dest}",
]
logger.info("Mirroring %s%s", image, dest)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode != 0:
logger.error("Mirror failed for %s: %s", image, result.stderr)
return False
return True
# ─────────────────────────────────────────────────────────────────────────────
# Mattermost notification
# ─────────────────────────────────────────────────────────────────────────────
def build_report(results: list[dict]) -> str:
"""Build a Mattermost markdown report from version check results."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d")
needs_update = [r for r in results if r.get("status") == "outdated"]
floating = [r for r in results if r.get("status") == "floating"]
up_to_date = [r for r in results if r.get("status") == "current"]
skipped = [r for r in results if r.get("status") == "skipped"]
lines = [f"## :kubernetes: K8s Weekly Version Report — {now}\n"]
if needs_update:
lines.append(f"### :red_circle: Updates Available ({len(needs_update)})\n")
lines.append("| Namespace | Image | Running | Latest |")
lines.append("|---|---|---|---|")
for r in sorted(needs_update, key=lambda x: x["image"]):
pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]}))
lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` | `{r['latest']}` |")
lines.append("")
if floating:
lines.append(f"### :warning: Floating Tags — digest-tracked ({len(floating)})\n")
lines.append("| Namespace | Image | Tag |")
lines.append("|---|---|---|")
for r in sorted(floating, key=lambda x: x["image"]):
pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]}))
lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |")
lines.append("")
if up_to_date:
lines.append(f"### :white_check_mark: Up to Date ({len(up_to_date)})\n")
lines.append("| Namespace | Image | Version |")
lines.append("|---|---|---|")
for r in sorted(up_to_date, key=lambda x: x["image"]):
pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]}))
lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |")
lines.append("")
if skipped:
lines.append(
f"*{len(skipped)} images skipped (local registry, pinned commits, or complex tags)*\n"
)
return "\n".join(lines)
def send_mattermost(report: str, webhook_url: str) -> None:
payload = {"text": report, "username": "k8s-version-tracker", "icon_emoji": ":kubernetes:"}
r = requests.post(webhook_url, json=payload, timeout=15)
r.raise_for_status()
logger.info("Mattermost notification sent")
# ─────────────────────────────────────────────────────────────────────────────
# Main
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
webhook_url = os.environ["MATTERMOST_WEBHOOK_URL"]
local_registry = os.environ.get("LOCAL_REGISTRY", "registry.storedbox.net")
do_mirror = os.environ.get("MIRROR_IMAGES", "false").lower() == "true"
logger.info("Gathering cluster image inventory …")
all_images = get_cluster_images()
logger.info("Found %d unique image references", len(all_images))
results = []
mirrored = []
for ref, img in all_images.items():
entry = {
"image": ref,
"image_short": img.display_name,
"current": img.tag,
"latest": None,
"pods": img.pods,
"status": "skipped",
}
if img.is_local:
entry["status"] = "skipped"
elif img.skip_check:
entry["status"] = "skipped"
elif img.is_floating:
entry["status"] = "floating"
else:
latest = get_latest_version(img)
if latest is None:
entry["status"] = "skipped"
else:
entry["latest"] = latest
try:
current_v = Version(img.tag.lstrip("v"))
latest_v = Version(latest.lstrip("v"))
entry["status"] = "outdated" if latest_v > current_v else "current"
except InvalidVersion:
entry["status"] = "current" if img.tag == latest else "outdated"
if do_mirror and entry["status"] == "outdated" and entry["latest"]:
new_ref = ref.rsplit(":", 1)[0] + ":" + entry["latest"]
if mirror_to_local(new_ref, local_registry):
mirrored.append(new_ref)
results.append(entry)
report = build_report(results)
if mirrored:
report += f"\n\n*Mirrored {len(mirrored)} updated images to `{local_registry}`*"
logger.info("Sending Mattermost notification …")
send_mattermost(report, webhook_url)
logger.info("Done. %d outdated, %d floating, %d current, %d skipped",
sum(1 for r in results if r["status"] == "outdated"),
sum(1 for r in results if r["status"] == "floating"),
sum(1 for r in results if r["status"] == "current"),
sum(1 for r in results if r["status"] == "skipped"))
if __name__ == "__main__":
main()