feat: version checker script
This commit is contained in:
410
check_versions.py
Normal file
410
check_versions.py
Normal file
@@ -0,0 +1,410 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
k8s-version-tracker: Weekly Kubernetes image version checker and Mattermost notifier.
|
||||
|
||||
Queries all running pods in the cluster, checks upstream registries for newer versions,
|
||||
optionally mirrors updated images to the local Zot registry, and sends a Mattermost
|
||||
notification with the weekly report.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import requests
|
||||
from kubernetes import client, config
|
||||
from packaging.version import Version, InvalidVersion
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Configuration
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
LOCAL_REGISTRIES = {
|
||||
"registry.storedbox.net",
|
||||
"192.168.87.31:5000",
|
||||
"192.168.87.31",
|
||||
"192.168.86.200",
|
||||
}
|
||||
|
||||
# Tags that float (can't track by semver — compare digest instead)
|
||||
FLOATING_TAGS = {
|
||||
"latest", "main", "master", "release", "stable", "edge",
|
||||
"nightly", "dev", "snapshot", "lts",
|
||||
}
|
||||
|
||||
# Tags that encode non-comparable info (skip version check)
|
||||
SKIP_TAG_PATTERNS = [
|
||||
r"^sha256:", # digest refs
|
||||
r"-jdk\d+", # e.g. lts-jdk17
|
||||
r"^pg\d+-", # e.g. pg14-v0.2.0
|
||||
r"^stable-\d+$", # jitsi build numbers — handled separately
|
||||
r"fbde4d5e", # devtron commit hashes
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageInfo:
|
||||
original: str # raw image string from pod spec
|
||||
registry: str
|
||||
namespace: str # e.g. 'prom', 'library', 'grafana'
|
||||
repo: str # e.g. 'prometheus'
|
||||
tag: str
|
||||
pods: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def display_name(self) -> str:
|
||||
if self.registry == "docker.io":
|
||||
if self.namespace == "library":
|
||||
return f"{self.repo}:{self.tag}"
|
||||
return f"{self.namespace}/{self.repo}:{self.tag}"
|
||||
return f"{self.registry}/{self.namespace}/{self.repo}:{self.tag}"
|
||||
|
||||
@property
|
||||
def is_local(self) -> bool:
|
||||
return self.registry in LOCAL_REGISTRIES
|
||||
|
||||
@property
|
||||
def is_floating(self) -> bool:
|
||||
return self.tag.lower() in FLOATING_TAGS
|
||||
|
||||
@property
|
||||
def skip_check(self) -> bool:
|
||||
return any(re.search(p, self.tag) for p in SKIP_TAG_PATTERNS)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Image reference parsing
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def parse_image_ref(image: str) -> ImageInfo:
|
||||
"""Parse a full image reference into structured components."""
|
||||
ref = image.split("@")[0] # strip digest
|
||||
tag = "latest"
|
||||
if ":" in ref.split("/")[-1]:
|
||||
ref, tag = ref.rsplit(":", 1)
|
||||
|
||||
parts = ref.split("/")
|
||||
|
||||
if len(parts) == 1:
|
||||
return ImageInfo(image, "docker.io", "library", parts[0], tag)
|
||||
|
||||
first = parts[0]
|
||||
is_registry = "." in first or ":" in first or first == "localhost"
|
||||
|
||||
if not is_registry:
|
||||
# e.g. prom/prometheus, n8nio/n8n
|
||||
return ImageInfo(image, "docker.io", parts[0], "/".join(parts[1:]), tag)
|
||||
|
||||
registry = first
|
||||
remainder = parts[1:]
|
||||
if not remainder:
|
||||
return ImageInfo(image, registry, "", "", tag)
|
||||
|
||||
# registry/namespace/repo or registry/repo
|
||||
repo = remainder[-1]
|
||||
namespace = "/".join(remainder[:-1]) if len(remainder) > 1 else ""
|
||||
return ImageInfo(image, registry, namespace, repo, tag)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Registry version checkers
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _semver_tags(tags: list[str], current_tag: str) -> Optional[str]:
|
||||
"""From a list of tags, return the latest stable semver tag comparable to current."""
|
||||
norm = current_tag.lstrip("v")
|
||||
has_v_prefix = current_tag.startswith("v")
|
||||
|
||||
# Detect suffix pattern (e.g. '-alpine', '-amd64')
|
||||
suffix_match = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", norm)
|
||||
suffix = suffix_match.group(1) if suffix_match else ""
|
||||
if suffix:
|
||||
norm = norm[: -len(suffix)]
|
||||
|
||||
stable = []
|
||||
for t in tags:
|
||||
t_norm = t.lstrip("v")
|
||||
t_suffix = ""
|
||||
s = re.search(r"(-[a-zA-Z][a-zA-Z0-9._-]*)$", t_norm)
|
||||
if s:
|
||||
t_suffix = s.group(1)
|
||||
t_norm = t_norm[: -len(t_suffix)]
|
||||
|
||||
if t_suffix != suffix:
|
||||
continue
|
||||
if re.search(r"(alpha|beta|rc|pre|dev|nightly|snapshot)", t, re.I):
|
||||
continue
|
||||
try:
|
||||
Version(t_norm)
|
||||
stable.append((Version(t_norm), t))
|
||||
except InvalidVersion:
|
||||
continue
|
||||
|
||||
if not stable:
|
||||
return None
|
||||
|
||||
_, latest_tag = max(stable, key=lambda x: x[0])
|
||||
return latest_tag
|
||||
|
||||
|
||||
def check_dockerhub(namespace: str, repo: str, tag: str) -> Optional[str]:
|
||||
"""Return latest stable tag from Docker Hub."""
|
||||
ns = namespace if namespace and namespace != "library" else "library"
|
||||
url = f"https://hub.docker.com/v2/repositories/{ns}/{repo}/tags?page_size=50&ordering=last_updated"
|
||||
try:
|
||||
r = requests.get(url, timeout=15)
|
||||
r.raise_for_status()
|
||||
tags = [t["name"] for t in r.json().get("results", [])]
|
||||
return _semver_tags(tags, tag)
|
||||
except Exception as e:
|
||||
logger.warning("DockerHub %s/%s: %s", ns, repo, e)
|
||||
return None
|
||||
|
||||
|
||||
def check_ghcr(namespace: str, repo: str, tag: str) -> Optional[str]:
|
||||
"""Return latest stable tag from GitHub Container Registry."""
|
||||
token_b64 = "e30K" # anonymous token for public images
|
||||
headers = {"Authorization": f"Bearer {token_b64}"}
|
||||
name = f"{namespace}/{repo}" if namespace else repo
|
||||
url = f"https://ghcr.io/v2/{name}/tags/list"
|
||||
try:
|
||||
r = requests.get(url, headers=headers, timeout=15)
|
||||
if r.status_code == 401:
|
||||
# Try unauthenticated
|
||||
r = requests.get(url, timeout=15)
|
||||
r.raise_for_status()
|
||||
tags = r.json().get("tags", [])
|
||||
return _semver_tags(tags, tag)
|
||||
except Exception as e:
|
||||
logger.warning("GHCR %s/%s: %s", namespace, repo, e)
|
||||
return None
|
||||
|
||||
|
||||
def check_quay(namespace: str, repo: str, tag: str) -> Optional[str]:
|
||||
"""Return latest stable tag from quay.io."""
|
||||
name = f"{namespace}/{repo}" if namespace else repo
|
||||
url = f"https://quay.io/api/v1/repository/{name}/tag/?limit=50&onlyActiveTags=true"
|
||||
try:
|
||||
r = requests.get(url, timeout=15)
|
||||
r.raise_for_status()
|
||||
tags = [t["name"] for t in r.json().get("tags", [])]
|
||||
return _semver_tags(tags, tag)
|
||||
except Exception as e:
|
||||
logger.warning("quay.io %s/%s: %s", namespace, repo, e)
|
||||
return None
|
||||
|
||||
|
||||
def check_linuxserver(repo: str, tag: str) -> Optional[str]:
|
||||
"""Check lscr.io/linuxserver images (backed by GHCR)."""
|
||||
return check_ghcr("linuxserver", repo, tag)
|
||||
|
||||
|
||||
def get_latest_version(img: ImageInfo) -> Optional[str]:
|
||||
"""Dispatch version check to the correct registry handler."""
|
||||
reg = img.registry.lower()
|
||||
|
||||
if "docker.io" in reg:
|
||||
return check_dockerhub(img.namespace, img.repo, img.tag)
|
||||
if "ghcr.io" in reg:
|
||||
return check_ghcr(img.namespace, img.repo, img.tag)
|
||||
if "quay.io" in reg:
|
||||
return check_quay(img.namespace, img.repo, img.tag)
|
||||
if "lscr.io" in reg:
|
||||
return check_linuxserver(img.repo, img.tag)
|
||||
|
||||
logger.debug("No handler for registry %s, skipping", img.registry)
|
||||
return None
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Cluster image inventory
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_cluster_images() -> dict[str, ImageInfo]:
|
||||
"""Return {image_ref: ImageInfo} for all running pods in the cluster."""
|
||||
try:
|
||||
config.load_incluster_config()
|
||||
except config.ConfigException:
|
||||
config.load_kube_config()
|
||||
|
||||
v1 = client.CoreV1Api()
|
||||
pods = v1.list_pod_for_all_namespaces(watch=False)
|
||||
|
||||
images: dict[str, ImageInfo] = {}
|
||||
for pod in pods.items:
|
||||
ns = pod.metadata.namespace
|
||||
pod_name = pod.metadata.name
|
||||
for container in (pod.spec.containers or []):
|
||||
ref = container.image
|
||||
if not ref:
|
||||
continue
|
||||
loc = f"{ns}/{pod_name}"
|
||||
if ref in images:
|
||||
images[ref].pods.append(loc)
|
||||
else:
|
||||
info = parse_image_ref(ref)
|
||||
info.pods.append(loc)
|
||||
images[ref] = info
|
||||
|
||||
return images
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Image mirroring to local Zot registry
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def mirror_to_local(image: str, local_registry: str) -> bool:
|
||||
"""
|
||||
Copy an image from upstream to the local Zot registry using skopeo.
|
||||
Returns True on success.
|
||||
"""
|
||||
img = parse_image_ref(image)
|
||||
dest = f"{local_registry}/{img.namespace}/{img.repo}:{img.tag}".lstrip("/")
|
||||
dest = dest.replace("//", "/")
|
||||
cmd = [
|
||||
"skopeo", "copy",
|
||||
"--dest-tls-verify=false",
|
||||
f"docker://{image}",
|
||||
f"docker://{dest}",
|
||||
]
|
||||
logger.info("Mirroring %s → %s", image, dest)
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||||
if result.returncode != 0:
|
||||
logger.error("Mirror failed for %s: %s", image, result.stderr)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Mattermost notification
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def build_report(results: list[dict]) -> str:
|
||||
"""Build a Mattermost markdown report from version check results."""
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
needs_update = [r for r in results if r.get("status") == "outdated"]
|
||||
floating = [r for r in results if r.get("status") == "floating"]
|
||||
up_to_date = [r for r in results if r.get("status") == "current"]
|
||||
skipped = [r for r in results if r.get("status") == "skipped"]
|
||||
|
||||
lines = [f"## :kubernetes: K8s Weekly Version Report — {now}\n"]
|
||||
|
||||
if needs_update:
|
||||
lines.append(f"### :red_circle: Updates Available ({len(needs_update)})\n")
|
||||
lines.append("| Namespace | Image | Running | Latest |")
|
||||
lines.append("|---|---|---|---|")
|
||||
for r in sorted(needs_update, key=lambda x: x["image"]):
|
||||
pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]}))
|
||||
lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` | `{r['latest']}` |")
|
||||
lines.append("")
|
||||
|
||||
if floating:
|
||||
lines.append(f"### :warning: Floating Tags — digest-tracked ({len(floating)})\n")
|
||||
lines.append("| Namespace | Image | Tag |")
|
||||
lines.append("|---|---|---|")
|
||||
for r in sorted(floating, key=lambda x: x["image"]):
|
||||
pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]}))
|
||||
lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |")
|
||||
lines.append("")
|
||||
|
||||
if up_to_date:
|
||||
lines.append(f"### :white_check_mark: Up to Date ({len(up_to_date)})\n")
|
||||
lines.append("| Namespace | Image | Version |")
|
||||
lines.append("|---|---|---|")
|
||||
for r in sorted(up_to_date, key=lambda x: x["image"]):
|
||||
pods_ns = ", ".join(sorted({p.split("/")[0] for p in r["pods"]}))
|
||||
lines.append(f"| {pods_ns} | `{r['image_short']}` | `{r['current']}` |")
|
||||
lines.append("")
|
||||
|
||||
if skipped:
|
||||
lines.append(
|
||||
f"*{len(skipped)} images skipped (local registry, pinned commits, or complex tags)*\n"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def send_mattermost(report: str, webhook_url: str) -> None:
|
||||
payload = {"text": report, "username": "k8s-version-tracker", "icon_emoji": ":kubernetes:"}
|
||||
r = requests.post(webhook_url, json=payload, timeout=15)
|
||||
r.raise_for_status()
|
||||
logger.info("Mattermost notification sent")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Main
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> None:
|
||||
webhook_url = os.environ["MATTERMOST_WEBHOOK_URL"]
|
||||
local_registry = os.environ.get("LOCAL_REGISTRY", "registry.storedbox.net")
|
||||
do_mirror = os.environ.get("MIRROR_IMAGES", "false").lower() == "true"
|
||||
|
||||
logger.info("Gathering cluster image inventory …")
|
||||
all_images = get_cluster_images()
|
||||
logger.info("Found %d unique image references", len(all_images))
|
||||
|
||||
results = []
|
||||
mirrored = []
|
||||
|
||||
for ref, img in all_images.items():
|
||||
entry = {
|
||||
"image": ref,
|
||||
"image_short": img.display_name,
|
||||
"current": img.tag,
|
||||
"latest": None,
|
||||
"pods": img.pods,
|
||||
"status": "skipped",
|
||||
}
|
||||
|
||||
if img.is_local:
|
||||
entry["status"] = "skipped"
|
||||
elif img.skip_check:
|
||||
entry["status"] = "skipped"
|
||||
elif img.is_floating:
|
||||
entry["status"] = "floating"
|
||||
else:
|
||||
latest = get_latest_version(img)
|
||||
if latest is None:
|
||||
entry["status"] = "skipped"
|
||||
else:
|
||||
entry["latest"] = latest
|
||||
try:
|
||||
current_v = Version(img.tag.lstrip("v"))
|
||||
latest_v = Version(latest.lstrip("v"))
|
||||
entry["status"] = "outdated" if latest_v > current_v else "current"
|
||||
except InvalidVersion:
|
||||
entry["status"] = "current" if img.tag == latest else "outdated"
|
||||
|
||||
if do_mirror and entry["status"] == "outdated" and entry["latest"]:
|
||||
new_ref = ref.rsplit(":", 1)[0] + ":" + entry["latest"]
|
||||
if mirror_to_local(new_ref, local_registry):
|
||||
mirrored.append(new_ref)
|
||||
|
||||
results.append(entry)
|
||||
|
||||
report = build_report(results)
|
||||
|
||||
if mirrored:
|
||||
report += f"\n\n*Mirrored {len(mirrored)} updated images to `{local_registry}`*"
|
||||
|
||||
logger.info("Sending Mattermost notification …")
|
||||
send_mattermost(report, webhook_url)
|
||||
logger.info("Done. %d outdated, %d floating, %d current, %d skipped",
|
||||
sum(1 for r in results if r["status"] == "outdated"),
|
||||
sum(1 for r in results if r["status"] == "floating"),
|
||||
sum(1 for r in results if r["status"] == "current"),
|
||||
sum(1 for r in results if r["status"] == "skipped"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user