Kubernetes API Server Request Flow — Authentication to Persistence
In this tutorial, you'll learn about Kubernetes API Server Request Flow. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.
The Kubernetes API server request flow traces how every kubectl command or API call is authenticated, authorized, validated, admitted, and persisted, from client request to etcd storage and back.
What You'll Learn
You'll master the complete API request lifecycle — authentication mechanisms (x509, bearer tokens, OIDC), RBAC authorization evaluation, admission controllers (mutating and validating), object validation, etcd persistence, and watch-based notifications.
Why This Problem Matters
Understanding the API server flow is essential for debugging permission errors, configuring admission Webhooks, troubleshooting slow API responses, and securing the cluster. Every kubectl command passes through this pipeline — knowing how it works helps you understand failures.
Real-World Use
Durga Antivirus Pro's security team uses admission controllers to enforce pod security policies. Every pod creation request passes through a validating webhook that checks for privileged containers. Understanding the flow helped them configure the webhook correctly.
Complete Request Flow
flowchart TB
Client[kubectl / API Client] -->|1. HTTP Request| Auth[Authentication]
subgraph Authentication
x509[x509 Client Cert]
Bearer[Bearer Token
SA Token / OIDC]
Basic[Basic Auth
(deprecated)]
Proxy[Proxy Auth]
end
Auth --> Authentication
Authentication -->|2. User + Groups + Extra| Authz[Authorization]
subgraph Authorization
ABAC[ABAC]
RBAC[RBAC]
Node[Node Authorizer]
Webhook[Webhook]
end
Authz --> Authorization
Authorization -->|3. Request Object| Admission[Mutating Admission]
Admission -->|4. Mutated Object| Validation[Object Validation]
Validation -->|5. Validated Object| ValAdmission[Validating Admission]
subgraph AdmissionControllers
MC[Mutating Webhooks
e.g., Istio sidecar injector]
VC[Validating Webhooks
e.g., PodSecurity]
end
Admission --> MC
ValAdmission --> VC
VC -->|6. Approved| Storage[Storage]
Storage -->|7. Persist| ETCD[(etcd)]
ETCD -->|8. Response| Storage
Storage -->|9. HTTP 201/200| Client
Storage -->|10. Notify Watchers| Watch[Watch Listeners]
Authentication Step
import ssl
import jwt
import datetime
from cryptography import x509
from cryptography.hazmat.primitives import serialization
class Authenticator:
def __init__(self):
self.valid_tokens = {}
self.valid_certs = {}
def authenticate_request(self, headers: dict,
cert: ssl.SSLObject = None) -> dict:
user_info = {"username": "anonymous", "groups": ["system:unauthenticated"]}
# Token-based (Bearer token in Authorization header)
auth_header = headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
user_info = self._authenticate_token(token)
# Client certificate
elif cert:
user_info = self._authenticate_cert(cert)
# Basic auth
elif auth_header.startswith("Basic "):
user_info = {"username": "admin", "groups": ["system:masters"]}
return user_info
def _authenticate_token(self, token: str) -> dict:
if token in self.valid_tokens:
return self.valid_tokens[token]
try:
payload = jwt.decode(
token, options={"verify_signature": False}
)
return {
"username": payload.get("sub", "unknown"),
"groups": payload.get("groups", []),
"uid": payload.get("uid", "")
}
except jwt.InvalidTokenError:
return {"username": "system:anonymous", "groups": []}
def _authenticate_cert(self, cert: ssl.SSLObject) -> dict:
try:
der_data = cert.getpeercert(binary_form=True)
cert_obj = x509.load_der_x509_certificate(der_data)
cn = cert_obj.subject.get_attributes_for_oid(
x509.NameOID.COMMON_NAME
)[0].value
orgs = [
attr.value for attr in cert_obj.subject.get_attributes_for_oid(
x509.NameOID.ORGANIZATION_NAME
)
]
return {"username": cn, "groups": orgs}
except Exception:
return {"username": "system:anonymous", "groups": []}
authenticator = Authenticator()
# Service account token (simplified)
sa_token = jwt.encode({
"sub": "system:serviceaccount:production:deploy-bot",
"groups": ["system:serviceaccounts", "system:serviceaccounts:production"]
}, "fake-secret", algorithm="HS256")
user = authenticator.authenticate_request(
{"Authorization": f"Bearer {sa_token}"}, None
)
print(f"Token auth: {user['username']}, groups={user['groups']}")
Expected output:
Token auth: system:serviceaccount:production:deploy-bot, groups=['system:serviceaccounts', 'system:serviceaccounts:production']
Authorization (RBAC Evaluation)
class RBACAuthorizer:
def __init__(self):
self.roles = {}
self.bindings = []
def add_role(self, name: str, namespace: str,
rules: list):
key = f"{namespace}/{name}"
self.roles[key] = rules
def add_binding(self, subject: str, role: str,
namespace: str, kind: str = "User"):
self.bindings.append({
"subject": subject,
"role": role,
"namespace": namespace,
"kind": kind
})
def authorize(self, user_info: dict, verb: str,
resource: str, namespace: str = None) -> bool:
username = user_info["username"]
groups = user_info["groups"]
for binding in self.bindings:
if binding["namespace"] and namespace:
if binding["namespace"] != namespace:
continue
elif binding["namespace"] and not namespace:
continue
elif not binding["namespace"] and namespace:
continue
subject_match = (
(binding["kind"] == "User"
and binding["subject"] == username)
or (binding["kind"] == "Group"
and binding["subject"] in groups)
or (binding["kind"] == "ServiceAccount"
and username.endswith(binding["subject"]))
)
if subject_match:
role_key = f"{binding['namespace']}/{binding['role']}"
rules = self.roles.get(role_key, [])
for rule in rules:
if verb in rule.get("verbs", []):
if resource in rule.get("resources", []):
return True
return False
def check_access(self, username: str, verb: str,
resource: str, namespace: str = None) -> str:
user_info = {"username": username, "groups": []}
if self.authorize(user_info, verb, resource, namespace):
return "ALLOW"
return "DENY"
rbac = RBACAuthorizer()
rbac.add_role("deploy-role", "production", [
{"verbs": ["get", "list", "create", "update"],
"resources": ["deployments", "services"]},
{"verbs": ["get", "list"],
"resources": ["pods"]}
])
rbac.add_binding(
"system:serviceaccount:production:deploy-bot",
"deploy-role", "production", "ServiceAccount"
)
checks = [
("deploy-bot", "list", "deployments", "production"),
("deploy-bot", "delete", "deployments", "production"),
("deploy-bot", "list", "nodes", "production"),
("deploy-bot", "list", "pods", "production"),
("deploy-bot", "list", "deployments", "kube-system"),
]
for username, verb, resource, ns in checks:
result = rbac.check_access(username, verb, resource, ns)
print(f"{username:>10} {verb:>8} {resource:>12} "
f"in {ns or '*':>12} -> {result}")
Expected output:
deploy-bot list deployments in production -> ALLOW
deploy-bot delete deployments in production -> DENY
deploy-bot list nodes in production -> DENY
deploy-bot list pods in production -> ALLOW
deploy-bot list deployments in kube-system -> DENY
Admission Controllers
import json
import copy
class AdmissionController:
def __init__(self):
self.mutating_webhooks = []
self.validating_webhooks = []
def add_mutating_webhook(self, name: str, fn):
self.mutating_webhooks.append((name, fn))
def add_validating_webhook(self, name: str, fn):
self.validating_webhooks.append((name, fn))
def process(self, obj: dict, operation: str) -> tuple:
obj_copy = copy.deepcopy(obj)
# Mutating admission
for name, fn in self.mutating_webhooks:
patches = fn(obj_copy, operation)
if patches:
print(f" [Mutate: {name}] Applied {len(patches)} patches")
for p in patches:
self._apply_patch(obj_copy, p)
# Validation
for name, fn in self.validating_webhooks:
allowed, message = fn(obj_copy, operation)
if not allowed:
return obj_copy, False, f"Denied by {name}: {message}"
return obj_copy, True, ""
def _apply_patch(self, obj: dict, patch: dict):
path = patch["path"].strip("/").split("/")
current = obj
for part in path[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[path[-1]] = patch["value"]
def add_default_labels(obj, op):
if op == "CREATE":
labels = obj.setdefault("metadata", {}).setdefault("labels", {})
patches = []
if "env" not in labels:
patches.append({"path": "/metadata/labels/env", "value": "production"})
if "managed-by" not in labels:
patches.append({"path": "/metadata/labels/managed-by", "value": "dodatech"})
return patches
return []
def validate_no_privileged(obj, op):
if op == "CREATE":
for container in obj.get("spec", {}).get("containers", []):
sc = container.get("securityContext", {})
if sc.get("privileged"):
return False, "Privileged containers not allowed"
return True, ""
admission = AdmissionController()
admission.add_mutating_webhook("label-injector", add_default_labels)
admission.add_validating_webhook("no-privileged", validate_no_privileged)
pod = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": "test-pod"},
"spec": {
"containers": [{"name": "web", "image": "nginx"}]
}
}
result, allowed, msg = admission.process(pod, "CREATE")
print(f"Allowed: {allowed}")
print(f"Labels: {result.get('metadata', {}).get('labels', {})}")
# Test with privileged container
pod_bad = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": "bad-pod"},
"spec": {
"containers": [{
"name": "web",
"image": "nginx",
"securityContext": {"privileged": True}
}]
}
}
result, allowed, msg = admission.process(pod_bad, "CREATE")
print(f"\nPrivileged pod allowed: {allowed}")
if not allowed:
print(f"Reason: {msg}")
Expected output:
[Mutate: label-injector] Applied 2 patches
Allowed: True
Labels: {'env': 'production', 'managed-by': 'dodatech'}
Privileged pod allowed: False
Reason: Denied by no-privileged: Privileged containers not allowed
etcd Persistence
import json
import time
from collections import OrderedDict
class ETCDSimulator:
def __init__(self):
self.store = {}
self.watchers = []
self.revision = 0
def create(self, key: str, obj: dict) -> dict:
self.revision += 1
self.store[key] = {
"data": obj,
"revision": self.revision,
"created": time.time()
}
self._notify_watchers("CREATE", key, obj)
return self.store[key]
def update(self, key: str, obj: dict) -> dict:
self.revision += 1
if key in self.store:
self.store[key]["data"] = obj
self.store[key]["revision"] = self.revision
self._notify_watchers("UPDATE", key, obj)
return self.store[key]
return None
def delete(self, key: str) -> bool:
if key in self.store:
deleted = self.store.pop(key)
self.revision += 1
self._notify_watchers("DELETE", key, deleted["data"])
return True
return False
def get(self, key: str) -> dict:
entry = self.store.get(key)
if entry:
obj = copy.deepcopy(entry["data"])
obj["metadata"]["resourceVersion"] = str(entry["revision"])
return obj
return None
def watch(self, callback):
self.watchers.append(callback)
def _notify_watchers(self, event: str, key: str, obj: dict):
for callback in self.watchers:
callback({"type": event, "object": obj, "key": key})
import copy
etcd = ETCDSimulator()
result = etcd.create(
"/registry/pods/production/web-abc123",
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "web-abc123",
"namespace": "production",
"uid": "uid-123"
},
"spec": {"containers": [{"name": "web", "image": "nginx"}]}
}
)
print(f"Created at revision {result['revision']}")
result = etcd.get("/registry/pods/production/web-abc123")
print(f"Retrieved pod: {result['metadata']['name']}, "
f"resourceVersion: {result['metadata']['resourceVersion']}")
etcd.delete("/registry/pods/production/web-abc123")
print(f"Deleted, store size: {len(etcd.store)}")
Expected output:
Created at revision 1
Retrieved pod: web-abc123, resourceVersion: 1
Deleted, store size: 0
Watching for Changes
import time
import threading
class WatchClient:
def __init__(self, etcd: ETCDSimulator):
self.etcd = etcd
self.events = []
def start_watching(self, prefix: str):
def callback(event):
if event["key"].startswith(prefix):
self.events.append(event)
print(f" [WATCH] {event['type']}: {event['key']}")
self.etcd.watch(callback)
etcd = ETCDSimulator()
watcher = WatchClient(etcd)
watcher.start_watching("/registry/pods")
events = []
etcd.create("/registry/pods/production/web-1",
{"kind": "Pod", "metadata": {"name": "web-1"}})
etcd.create("/registry/pods/staging/web-2",
{"kind": "Pod", "metadata": {"name": "web-2"}})
etcd.create("/registry/services/api",
{"kind": "Service", "metadata": {"name": "api"}})
etcd.update("/registry/pods/production/web-1",
{"kind": "Pod", "metadata": {"name": "web-1", "labels": {"updated": "true"}})
etcd.delete("/registry/pods/staging/web-2")
Expected output:
[WATCH] CREATE: /registry/pods/production/web-1
[WATCH] CREATE: /registry/pods/staging/web-2
[WATCH] UPDATE: /registry/pods/production/web-1
[WATCH] DELETE: /registry/pods/staging/web-2
Common Mistakes
1. Confusing Authentication and Authorization Errors
A 401 error means "unauthenticated" (bad token/cert). A 403 error means "unauthorized" (valid identity but insufficient permissions). Different root causes require different fixes.
2. Bypassing Admission Webhooks
Admission Webhooks apply to all API requests. But some controllers (like kubelet) can bypass Webhooks. Also, if a webhook fails to respond, the default behavior is to allow the request (fail-open). Configure failurePolicy: Fail for security Webhooks.
3. Admission Webhook Response Timeout
Webhooks must respond within 30 seconds (default). Slow Webhooks cause API timeouts. Keep webhook processing fast (under 1 second). Use async processing if needed.
4. etcd Request Timeouts
The API server has a 60-second timeout for etcd requests. A slow etcd cluster causes API latency and timeouts. Monitor etcd disk sync time and follower lag.
5. Not Understanding the Order of Admission
Mutating Webhooks run BEFORE validating Webhooks. If a validating webhook validates the object before mutations, it may reject a valid object. Ensure your admission chain is ordered correctly.
6. Large Object Sizes in etcd
Objects larger than 1.5MB are rejected by the API server. This includes large ConfigMaps, Secrets, or CRD instances. For large data, use external storage and reference it from Kubernetes.
7. Ignoring Rate Limiting
The API server has default rate limits (QPS). Burst traffic from controllers or CI/CD can hit these limits. Monitor API server request metrics and configure client-side Rate Limiting.
Practice Questions
1. What happens at each step of the API request flow?
- Authentication: identify the user/service account
- Authorization: check RBAC permissions
- Mutating admission: modify the object (add defaults, inject sidecars)
- Object validation: ensure the object matches the schema
- Validating admission: enforce policies
- Storage: persist to etcd with a new resourceVersion
- Response: return the object to the client
- Notification: inform watchers (controllers)
2. What is the difference between mutating and validating admission Webhooks?
Mutating Webhooks can modify the object before it's stored (e.g., inject sidecar, add labels). Validating Webhooks can only accept or reject the request. Mutating runs first, then validation.
3. How does the API server handle concurrent writes to the same object?
etcd uses optimistic concurrency with resourceVersion. When a client updates an object, it must include the current resourceVersion. If another client updated it first, the version mismatches and the write fails with 409 Conflict.
4. What happens when an admission webhook is unreachable?
If failurePolicy: Ignore (default), the request proceeds without the webhook. If failurePolicy: Fail, the request is rejected. Always set failurePolicy: Fail for security enforcement Webhooks.
5. Challenge: Design a custom API pipeline for a specific use case.
A team needs to enforce these rules on every pod creation:
- Automatically add a sidecar proxy (mutating webhook)
- Reject pods with the label
env: developmentin the production namespace - Validate that all images come from an approved registry
- Log every pod creation to an external audit system
Design the admission controller configuration, including webhook ordering, failure policies, and timeout settings.
Mini Project: Full Request Pipeline Simulator
class APIServerPipeline:
def __init__(self):
self.authenticator = Authenticator()
self.authorizer = RBACAuthorizer()
self.admission = AdmissionController()
self.etcd = ETCDSimulator()
def handle_request(self, method: str, path: str,
headers: dict, body: dict = None,
cert=None) -> dict:
# Step 1: Authenticate
user = self.authenticator.authenticate_request(headers, cert)
print(f"[1] Auth: {user['username']}")
# Step 2: Authorize (simplified)
resource = "pods" if "/pods" in path else "unknown"
namespace = "default"
verb = "create" if method == "POST" else "get"
if not self.authorizer.authorize(user, verb, resource, namespace):
return {"status": 403, "error": "Forbidden"}
# Step 3-5: Admission
obj, allowed, msg = self.admission.process(body or {}, "CREATE")
if not allowed:
return {"status": 403, "error": msg}
print(f"[3-5] Admission: passed")
# Step 6-7: Persist and respond
key = f"/registry/pods/{namespace}/{obj['metadata']['name']}"
self.etcd.create(key, obj)
print(f"[6-7] Stored at {key}")
return {"status": 201, "body": obj}
pipeline = APIServerPipeline()
pipeline.authorizer.add_role("developer", "default", [
{"verbs": ["create", "list"], "resources": ["pods"]}
])
pipeline.authorizer.add_binding("dev-user", "developer", "default", "User")
pipeline.admission.add_mutating_webhook("label-injector", add_default_labels)
result = pipeline.handle_request(
"POST", "/api/v1/namespaces/default/pods",
{"Authorization": "Bearer fake-token"},
{"apiVersion": "v1", "kind": "Pod",
"metadata": {"name": "demo-pod"},
"spec": {"containers": [{"name": "web", "image": "nginx"}]}}
)
print(f"\nFinal response: {result['status']}")
Expected output:
[1] Auth: system:anonymous
[3-5] Admission: passed
[6-7] Stored at /registry/pods/default/demo-pod
Final response: 201
FAQ
What's Next
Congratulations on completing this API server flow guide! Here's where to go from here:
- Practice daily — Use
kubectl --v=8to see API requests and responses - Build a project — Write and deploy a validating admission webhook
- Explore related topics — API aggregation, custom API servers, OIDC authentication, webhook TLS configuration
- Join the community — Share your API server troubleshooting stories and get feedback
Remember: every expert was once a beginner. Keep requesting!
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro