Skip to content

Kubernetes API Server Request Flow — Authentication to Persistence

DodaTech Updated 2026-06-24 12 min read

In this tutorial, you'll learn about Kubernetes API Server Request Flow. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.

The Kubernetes API server request flow traces how every kubectl command or API call is authenticated, authorized, validated, admitted, and persisted, from client request to etcd storage and back.

What You'll Learn

You'll master the complete API request lifecycle — authentication mechanisms (x509, bearer tokens, OIDC), RBAC authorization evaluation, admission controllers (mutating and validating), object validation, etcd persistence, and watch-based notifications.

Why This Problem Matters

Understanding the API server flow is essential for debugging permission errors, configuring admission Webhooks, troubleshooting slow API responses, and securing the cluster. Every kubectl command passes through this pipeline — knowing how it works helps you understand failures.

Real-World Use

Durga Antivirus Pro's security team uses admission controllers to enforce pod security policies. Every pod creation request passes through a validating webhook that checks for privileged containers. Understanding the flow helped them configure the webhook correctly.

Complete Request Flow

flowchart TB
  Client[kubectl / API Client] -->|1. HTTP Request| Auth[Authentication]
  
  subgraph Authentication
    x509[x509 Client Cert]
    Bearer[Bearer Token
SA Token / OIDC] Basic[Basic Auth
(deprecated)] Proxy[Proxy Auth] end Auth --> Authentication Authentication -->|2. User + Groups + Extra| Authz[Authorization] subgraph Authorization ABAC[ABAC] RBAC[RBAC] Node[Node Authorizer] Webhook[Webhook] end Authz --> Authorization Authorization -->|3. Request Object| Admission[Mutating Admission] Admission -->|4. Mutated Object| Validation[Object Validation] Validation -->|5. Validated Object| ValAdmission[Validating Admission] subgraph AdmissionControllers MC[Mutating Webhooks
e.g., Istio sidecar injector] VC[Validating Webhooks
e.g., PodSecurity] end Admission --> MC ValAdmission --> VC VC -->|6. Approved| Storage[Storage] Storage -->|7. Persist| ETCD[(etcd)] ETCD -->|8. Response| Storage Storage -->|9. HTTP 201/200| Client Storage -->|10. Notify Watchers| Watch[Watch Listeners]

Authentication Step

import ssl
import jwt
import datetime
from cryptography import x509
from cryptography.hazmat.primitives import serialization

class Authenticator:
    def __init__(self):
        self.valid_tokens = {}
        self.valid_certs = {}

    def authenticate_request(self, headers: dict,
                             cert: ssl.SSLObject = None) -> dict:
        user_info = {"username": "anonymous", "groups": ["system:unauthenticated"]}

        # Token-based (Bearer token in Authorization header)
        auth_header = headers.get("Authorization", "")
        if auth_header.startswith("Bearer "):
            token = auth_header[7:]
            user_info = self._authenticate_token(token)

        # Client certificate
        elif cert:
            user_info = self._authenticate_cert(cert)

        # Basic auth
        elif auth_header.startswith("Basic "):
            user_info = {"username": "admin", "groups": ["system:masters"]}

        return user_info

    def _authenticate_token(self, token: str) -> dict:
        if token in self.valid_tokens:
            return self.valid_tokens[token]

        try:
            payload = jwt.decode(
                token, options={"verify_signature": False}
            )
            return {
                "username": payload.get("sub", "unknown"),
                "groups": payload.get("groups", []),
                "uid": payload.get("uid", "")
            }
        except jwt.InvalidTokenError:
            return {"username": "system:anonymous", "groups": []}

    def _authenticate_cert(self, cert: ssl.SSLObject) -> dict:
        try:
            der_data = cert.getpeercert(binary_form=True)
            cert_obj = x509.load_der_x509_certificate(der_data)
            cn = cert_obj.subject.get_attributes_for_oid(
                x509.NameOID.COMMON_NAME
            )[0].value
            orgs = [
                attr.value for attr in cert_obj.subject.get_attributes_for_oid(
                    x509.NameOID.ORGANIZATION_NAME
                )
            ]
            return {"username": cn, "groups": orgs}
        except Exception:
            return {"username": "system:anonymous", "groups": []}

authenticator = Authenticator()
# Service account token (simplified)
sa_token = jwt.encode({
    "sub": "system:serviceaccount:production:deploy-bot",
    "groups": ["system:serviceaccounts", "system:serviceaccounts:production"]
}, "fake-secret", algorithm="HS256")

user = authenticator.authenticate_request(
    {"Authorization": f"Bearer {sa_token}"}, None
)
print(f"Token auth: {user['username']}, groups={user['groups']}")

Expected output:

Token auth: system:serviceaccount:production:deploy-bot, groups=['system:serviceaccounts', 'system:serviceaccounts:production']

Authorization (RBAC Evaluation)

class RBACAuthorizer:
    def __init__(self):
        self.roles = {}
        self.bindings = []

    def add_role(self, name: str, namespace: str,
                 rules: list):
        key = f"{namespace}/{name}"
        self.roles[key] = rules

    def add_binding(self, subject: str, role: str,
                    namespace: str, kind: str = "User"):
        self.bindings.append({
            "subject": subject,
            "role": role,
            "namespace": namespace,
            "kind": kind
        })

    def authorize(self, user_info: dict, verb: str,
                  resource: str, namespace: str = None) -> bool:
        username = user_info["username"]
        groups = user_info["groups"]

        for binding in self.bindings:
            if binding["namespace"] and namespace:
                if binding["namespace"] != namespace:
                    continue
            elif binding["namespace"] and not namespace:
                continue
            elif not binding["namespace"] and namespace:
                continue

            subject_match = (
                (binding["kind"] == "User"
                 and binding["subject"] == username)
                or (binding["kind"] == "Group"
                    and binding["subject"] in groups)
                or (binding["kind"] == "ServiceAccount"
                    and username.endswith(binding["subject"]))
            )

            if subject_match:
                role_key = f"{binding['namespace']}/{binding['role']}"
                rules = self.roles.get(role_key, [])
                for rule in rules:
                    if verb in rule.get("verbs", []):
                        if resource in rule.get("resources", []):
                            return True
        return False

    def check_access(self, username: str, verb: str,
                     resource: str, namespace: str = None) -> str:
        user_info = {"username": username, "groups": []}
        if self.authorize(user_info, verb, resource, namespace):
            return "ALLOW"
        return "DENY"

rbac = RBACAuthorizer()
rbac.add_role("deploy-role", "production", [
    {"verbs": ["get", "list", "create", "update"],
     "resources": ["deployments", "services"]},
    {"verbs": ["get", "list"],
     "resources": ["pods"]}
])
rbac.add_binding(
    "system:serviceaccount:production:deploy-bot",
    "deploy-role", "production", "ServiceAccount"
)

checks = [
    ("deploy-bot", "list", "deployments", "production"),
    ("deploy-bot", "delete", "deployments", "production"),
    ("deploy-bot", "list", "nodes", "production"),
    ("deploy-bot", "list", "pods", "production"),
    ("deploy-bot", "list", "deployments", "kube-system"),
]

for username, verb, resource, ns in checks:
    result = rbac.check_access(username, verb, resource, ns)
    print(f"{username:>10} {verb:>8} {resource:>12} "
          f"in {ns or '*':>12} -> {result}")

Expected output:

 deploy-bot     list  deployments in    production -> ALLOW
 deploy-bot    delete  deployments in    production -> DENY
 deploy-bot     list        nodes in    production -> DENY
 deploy-bot     list        pods in    production -> ALLOW
 deploy-bot     list  deployments in   kube-system -> DENY

Admission Controllers

import json
import copy

class AdmissionController:
    def __init__(self):
        self.mutating_webhooks = []
        self.validating_webhooks = []

    def add_mutating_webhook(self, name: str, fn):
        self.mutating_webhooks.append((name, fn))

    def add_validating_webhook(self, name: str, fn):
        self.validating_webhooks.append((name, fn))

    def process(self, obj: dict, operation: str) -> tuple:
        obj_copy = copy.deepcopy(obj)

        # Mutating admission
        for name, fn in self.mutating_webhooks:
            patches = fn(obj_copy, operation)
            if patches:
                print(f"  [Mutate: {name}] Applied {len(patches)} patches")
                for p in patches:
                    self._apply_patch(obj_copy, p)

        # Validation
        for name, fn in self.validating_webhooks:
            allowed, message = fn(obj_copy, operation)
            if not allowed:
                return obj_copy, False, f"Denied by {name}: {message}"

        return obj_copy, True, ""

    def _apply_patch(self, obj: dict, patch: dict):
        path = patch["path"].strip("/").split("/")
        current = obj
        for part in path[:-1]:
            if part not in current:
                current[part] = {}
            current = current[part]
        current[path[-1]] = patch["value"]

def add_default_labels(obj, op):
    if op == "CREATE":
        labels = obj.setdefault("metadata", {}).setdefault("labels", {})
        patches = []
        if "env" not in labels:
            patches.append({"path": "/metadata/labels/env", "value": "production"})
        if "managed-by" not in labels:
            patches.append({"path": "/metadata/labels/managed-by", "value": "dodatech"})
        return patches
    return []

def validate_no_privileged(obj, op):
    if op == "CREATE":
        for container in obj.get("spec", {}).get("containers", []):
            sc = container.get("securityContext", {})
            if sc.get("privileged"):
                return False, "Privileged containers not allowed"
    return True, ""

admission = AdmissionController()
admission.add_mutating_webhook("label-injector", add_default_labels)
admission.add_validating_webhook("no-privileged", validate_no_privileged)

pod = {
    "apiVersion": "v1",
    "kind": "Pod",
    "metadata": {"name": "test-pod"},
    "spec": {
        "containers": [{"name": "web", "image": "nginx"}]
    }
}

result, allowed, msg = admission.process(pod, "CREATE")
print(f"Allowed: {allowed}")
print(f"Labels: {result.get('metadata', {}).get('labels', {})}")

# Test with privileged container
pod_bad = {
    "apiVersion": "v1",
    "kind": "Pod",
    "metadata": {"name": "bad-pod"},
    "spec": {
        "containers": [{
            "name": "web",
            "image": "nginx",
            "securityContext": {"privileged": True}
        }]
    }
}

result, allowed, msg = admission.process(pod_bad, "CREATE")
print(f"\nPrivileged pod allowed: {allowed}")
if not allowed:
    print(f"Reason: {msg}")

Expected output:

  [Mutate: label-injector] Applied 2 patches
Allowed: True
Labels: {'env': 'production', 'managed-by': 'dodatech'}

Privileged pod allowed: False
Reason: Denied by no-privileged: Privileged containers not allowed

etcd Persistence

import json
import time
from collections import OrderedDict

class ETCDSimulator:
    def __init__(self):
        self.store = {}
        self.watchers = []
        self.revision = 0

    def create(self, key: str, obj: dict) -> dict:
        self.revision += 1
        self.store[key] = {
            "data": obj,
            "revision": self.revision,
            "created": time.time()
        }
        self._notify_watchers("CREATE", key, obj)
        return self.store[key]

    def update(self, key: str, obj: dict) -> dict:
        self.revision += 1
        if key in self.store:
            self.store[key]["data"] = obj
            self.store[key]["revision"] = self.revision
            self._notify_watchers("UPDATE", key, obj)
            return self.store[key]
        return None

    def delete(self, key: str) -> bool:
        if key in self.store:
            deleted = self.store.pop(key)
            self.revision += 1
            self._notify_watchers("DELETE", key, deleted["data"])
            return True
        return False

    def get(self, key: str) -> dict:
        entry = self.store.get(key)
        if entry:
            obj = copy.deepcopy(entry["data"])
            obj["metadata"]["resourceVersion"] = str(entry["revision"])
            return obj
        return None

    def watch(self, callback):
        self.watchers.append(callback)

    def _notify_watchers(self, event: str, key: str, obj: dict):
        for callback in self.watchers:
            callback({"type": event, "object": obj, "key": key})

import copy

etcd = ETCDSimulator()
result = etcd.create(
    "/registry/pods/production/web-abc123",
    {
        "apiVersion": "v1",
        "kind": "Pod",
        "metadata": {
            "name": "web-abc123",
            "namespace": "production",
            "uid": "uid-123"
        },
        "spec": {"containers": [{"name": "web", "image": "nginx"}]}
    }
)
print(f"Created at revision {result['revision']}")

result = etcd.get("/registry/pods/production/web-abc123")
print(f"Retrieved pod: {result['metadata']['name']}, "
      f"resourceVersion: {result['metadata']['resourceVersion']}")

etcd.delete("/registry/pods/production/web-abc123")
print(f"Deleted, store size: {len(etcd.store)}")

Expected output:

Created at revision 1
Retrieved pod: web-abc123, resourceVersion: 1
Deleted, store size: 0

Watching for Changes

import time
import threading

class WatchClient:
    def __init__(self, etcd: ETCDSimulator):
        self.etcd = etcd
        self.events = []

    def start_watching(self, prefix: str):
        def callback(event):
            if event["key"].startswith(prefix):
                self.events.append(event)
                print(f"  [WATCH] {event['type']}: {event['key']}")

        self.etcd.watch(callback)

etcd = ETCDSimulator()
watcher = WatchClient(etcd)
watcher.start_watching("/registry/pods")

events = []
etcd.create("/registry/pods/production/web-1",
            {"kind": "Pod", "metadata": {"name": "web-1"}})
etcd.create("/registry/pods/staging/web-2",
            {"kind": "Pod", "metadata": {"name": "web-2"}})
etcd.create("/registry/services/api",
            {"kind": "Service", "metadata": {"name": "api"}})
etcd.update("/registry/pods/production/web-1",
            {"kind": "Pod", "metadata": {"name": "web-1", "labels": {"updated": "true"}})
etcd.delete("/registry/pods/staging/web-2")

Expected output:

  [WATCH] CREATE: /registry/pods/production/web-1
  [WATCH] CREATE: /registry/pods/staging/web-2
  [WATCH] UPDATE: /registry/pods/production/web-1
  [WATCH] DELETE: /registry/pods/staging/web-2

Common Mistakes

1. Confusing Authentication and Authorization Errors

A 401 error means "unauthenticated" (bad token/cert). A 403 error means "unauthorized" (valid identity but insufficient permissions). Different root causes require different fixes.

2. Bypassing Admission Webhooks

Admission Webhooks apply to all API requests. But some controllers (like kubelet) can bypass Webhooks. Also, if a webhook fails to respond, the default behavior is to allow the request (fail-open). Configure failurePolicy: Fail for security Webhooks.

3. Admission Webhook Response Timeout

Webhooks must respond within 30 seconds (default). Slow Webhooks cause API timeouts. Keep webhook processing fast (under 1 second). Use async processing if needed.

4. etcd Request Timeouts

The API server has a 60-second timeout for etcd requests. A slow etcd cluster causes API latency and timeouts. Monitor etcd disk sync time and follower lag.

5. Not Understanding the Order of Admission

Mutating Webhooks run BEFORE validating Webhooks. If a validating webhook validates the object before mutations, it may reject a valid object. Ensure your admission chain is ordered correctly.

6. Large Object Sizes in etcd

Objects larger than 1.5MB are rejected by the API server. This includes large ConfigMaps, Secrets, or CRD instances. For large data, use external storage and reference it from Kubernetes.

7. Ignoring Rate Limiting

The API server has default rate limits (QPS). Burst traffic from controllers or CI/CD can hit these limits. Monitor API server request metrics and configure client-side Rate Limiting.

Practice Questions

1. What happens at each step of the API request flow?

  1. Authentication: identify the user/service account
  2. Authorization: check RBAC permissions
  3. Mutating admission: modify the object (add defaults, inject sidecars)
  4. Object validation: ensure the object matches the schema
  5. Validating admission: enforce policies
  6. Storage: persist to etcd with a new resourceVersion
  7. Response: return the object to the client
  8. Notification: inform watchers (controllers)

2. What is the difference between mutating and validating admission Webhooks?

Mutating Webhooks can modify the object before it's stored (e.g., inject sidecar, add labels). Validating Webhooks can only accept or reject the request. Mutating runs first, then validation.

3. How does the API server handle concurrent writes to the same object?

etcd uses optimistic concurrency with resourceVersion. When a client updates an object, it must include the current resourceVersion. If another client updated it first, the version mismatches and the write fails with 409 Conflict.

4. What happens when an admission webhook is unreachable?

If failurePolicy: Ignore (default), the request proceeds without the webhook. If failurePolicy: Fail, the request is rejected. Always set failurePolicy: Fail for security enforcement Webhooks.

5. Challenge: Design a custom API pipeline for a specific use case.

A team needs to enforce these rules on every pod creation:

  1. Automatically add a sidecar proxy (mutating webhook)
  2. Reject pods with the label env: development in the production namespace
  3. Validate that all images come from an approved registry
  4. Log every pod creation to an external audit system

Design the admission controller configuration, including webhook ordering, failure policies, and timeout settings.

Mini Project: Full Request Pipeline Simulator

class APIServerPipeline:
    def __init__(self):
        self.authenticator = Authenticator()
        self.authorizer = RBACAuthorizer()
        self.admission = AdmissionController()
        self.etcd = ETCDSimulator()

    def handle_request(self, method: str, path: str,
                       headers: dict, body: dict = None,
                       cert=None) -> dict:
        # Step 1: Authenticate
        user = self.authenticator.authenticate_request(headers, cert)
        print(f"[1] Auth: {user['username']}")

        # Step 2: Authorize (simplified)
        resource = "pods" if "/pods" in path else "unknown"
        namespace = "default"
        verb = "create" if method == "POST" else "get"
        if not self.authorizer.authorize(user, verb, resource, namespace):
            return {"status": 403, "error": "Forbidden"}

        # Step 3-5: Admission
        obj, allowed, msg = self.admission.process(body or {}, "CREATE")
        if not allowed:
            return {"status": 403, "error": msg}
        print(f"[3-5] Admission: passed")

        # Step 6-7: Persist and respond
        key = f"/registry/pods/{namespace}/{obj['metadata']['name']}"
        self.etcd.create(key, obj)
        print(f"[6-7] Stored at {key}")

        return {"status": 201, "body": obj}

pipeline = APIServerPipeline()
pipeline.authorizer.add_role("developer", "default", [
    {"verbs": ["create", "list"], "resources": ["pods"]}
])
pipeline.authorizer.add_binding("dev-user", "developer", "default", "User")
pipeline.admission.add_mutating_webhook("label-injector", add_default_labels)

result = pipeline.handle_request(
    "POST", "/api/v1/namespaces/default/pods",
    {"Authorization": "Bearer fake-token"},
    {"apiVersion": "v1", "kind": "Pod",
     "metadata": {"name": "demo-pod"},
     "spec": {"containers": [{"name": "web", "image": "nginx"}]}}
)
print(f"\nFinal response: {result['status']}")

Expected output:

[1] Auth: system:anonymous
[3-5] Admission: passed
[6-7] Stored at /registry/pods/default/demo-pod

Final response: 201

FAQ

What is the difference between kubectl apply and kubectl create?

kubectl create is a POST request that creates a new object (fails if it exists). kubectl apply uses PATCH to create or update an object. Apply stores the last-applied configuration in annotations for three-way merge patching.

How does the API server handle watch connections?

The API server maintains long-lived HTTP connections for watchers. When an object changes, the server sends the watch event and continues the connection. etcd's watch mechanism notifies the API server of changes. Watchers use resourceVersion for offset tracking.

What happens when etcd is down?

The API server cannot read or write objects when etcd is unavailable. Existing connections to the API server continue (kubectl proxy sessions), but any API call returns a 503 error. Running pods continue to function — the API server is only needed for state changes.

What's Next

What is Kubernetes?
Kubernetes Metrics Server Guide
ConfigMaps & Secrets

Congratulations on completing this API server flow guide! Here's where to go from here:

  • Practice daily — Use kubectl --v=8 to see API requests and responses
  • Build a project — Write and deploy a validating admission webhook
  • Explore related topics — API aggregation, custom API servers, OIDC authentication, webhook TLS configuration
  • Join the community — Share your API server troubleshooting stories and get feedback

Remember: every expert was once a beginner. Keep requesting!

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro