background-shape
Auto Remediation Pipelines with LLM Agents and Argo Events
May 7, 2025 · 8 min read · by Muhammad Amal programming

TL;DR — Use Argo Events as the typed bus between detectors and remediations, let an LLM pick from a small action catalog, and make every workflow idempotent with a hard timeout and a Slack approval gate for anything that hurts.

Auto-remediation is one of those ideas that sounds dangerous until you’ve built it once, and then it sounds dangerous in a more specific way. The dangerous version is an agent with kubectl access and a chat box. The useful version is a strictly typed pipeline that turns alerts into workflows, lets a language model pick the right workflow from a small menu, and logs everything to a tamper-evident store.

In this tutorial we’re building the useful version on Kubernetes 1.32 using Argo Events 1.9 as the event router and Argo Workflows 3.6 as the executor. The LLM is claude-3.7-sonnet, called from a tiny Python service that sits between the EventBus and the Sensor. Every action the model can propose is a CRD-defined WorkflowTemplate with a published blast radius and runbook.

The end state is a pipeline where a Prometheus alert produces an EventBus message, a triage service enriches it and asks the model to pick an action, and a Sensor matches the action to a Workflow. Nothing in the critical path is unbounded. Everything is auditable. The LLM never touches the cluster directly.

1. Architecture Overview

Five components, three of them off-the-shelf.

Prometheus Alertmanager
        |
        v
    Webhook --> Argo Events EventSource (webhook)
                        |
                        v
                   EventBus (NATS)
                        |
            +-----------+-----------+
            v                       v
    Triage Service              Sensor (matcher)
    (Python + LLM)                   |
            |                        v
            +-----> EventBus --> WorkflowTemplate
                                     |
                                     v
                              Argo Workflow runs
                                     |
                                     v
                              Slack + Audit log

The triage service is the one piece you write. The rest is configuration. Keep the triage service stateless and short. If it’s longer than 300 lines of Python you’re doing too much.

2. Setting Up Argo Events on Kubernetes 1.32

Install Argo Events 1.9 and a NATS EventBus in a dedicated namespace.

kubectl create namespace argo-events
kubectl apply -n argo-events \
  -f https://raw.githubusercontent.com/argoproj/argo-events/v1.9.2/manifests/install.yaml
kubectl apply -n argo-events \
  -f https://raw.githubusercontent.com/argoproj/argo-events/v1.9.2/examples/eventbus/native.yaml

Verify the EventBus is healthy.

kubectl -n argo-events get eventbus default -o jsonpath='{.status.phase}'
# expect: Running

Now define an EventSource that accepts Alertmanager webhooks.

# event-source.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: alertmanager
  namespace: argo-events
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  webhook:
    alerts:
      port: "12000"
      endpoint: /alerts
      method: POST

And point Alertmanager at it.

# alertmanager.yaml
receivers:
- name: argo-events
  webhook_configs:
  - url: http://alertmanager-eventsource-svc.argo-events:12000/alerts
    send_resolved: true
route:
  receiver: argo-events
  group_by: ['alertname', 'service']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h

The repeat_interval of 4h matters. Without it Alertmanager will retrigger your remediation pipeline every 5 minutes for the same firing alert and you’ll be very sad.

3. The Action Catalog

Before any code, write down what the LLM is allowed to do. This is a YAML file, checked into git, reviewed by humans.

# catalog/actions.yaml
version: 1
actions:
  - name: restart_pods
    template: restart-pods-tpl
    blast_radius: pods
    requires_approval: false
    params:
      - namespace
      - selector
    runbook: https://runbooks.internal/restart-pods
    timeout: 5m

  - name: scale_up
    template: scale-deployment-tpl
    blast_radius: deployment
    requires_approval: false
    params:
      - namespace
      - deployment
      - replicas
    constraints:
      max_replicas: 30
    runbook: https://runbooks.internal/scale-up
    timeout: 10m

  - name: rollback_release
    template: rollback-release-tpl
    blast_radius: deployment
    requires_approval: true
    params:
      - namespace
      - release
    runbook: https://runbooks.internal/rollback
    timeout: 15m

  - name: clear_redis_keys
    template: clear-redis-keys-tpl
    blast_radius: cache
    requires_approval: false
    params:
      - cluster
      - pattern
    constraints:
      pattern_prefix_required: true
    runbook: https://runbooks.internal/clear-redis
    timeout: 5m

Three rules: every action has a runbook URL, every destructive action requires approval, every action has a timeout. There are no exceptions and no escape hatches.

4. Writing the WorkflowTemplates

Each catalog entry maps to a WorkflowTemplate. Here’s the one for restart_pods.

# workflows/restart-pods-tpl.yaml
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: restart-pods-tpl
  namespace: argo-events
spec:
  serviceAccountName: remediation-runner
  entrypoint: main
  activeDeadlineSeconds: 300
  arguments:
    parameters:
      - name: namespace
      - name: selector
      - name: incident_id
  templates:
  - name: main
    steps:
    - - name: snapshot
        template: snapshot
    - - name: restart
        template: restart
    - - name: verify
        template: verify
  - name: snapshot
    script:
      image: bitnami/kubectl:1.32
      command: [sh]
      source: |
        kubectl -n {{workflow.parameters.namespace}} get pods \
          -l {{workflow.parameters.selector}} -o json \
          > /tmp/snapshot.json
        echo "Snapshotted $(jq '.items | length' /tmp/snapshot.json) pods"
  - name: restart
    script:
      image: bitnami/kubectl:1.32
      command: [sh]
      source: |
        kubectl -n {{workflow.parameters.namespace}} delete pods \
          -l {{workflow.parameters.selector}} --grace-period=30
  - name: verify
    script:
      image: bitnami/kubectl:1.32
      command: [sh]
      source: |
        for i in $(seq 1 30); do
          ready=$(kubectl -n {{workflow.parameters.namespace}} get pods \
            -l {{workflow.parameters.selector}} \
            -o json | jq '[.items[] | select(.status.phase=="Running")] | length')
          if [ "$ready" -gt 0 ]; then
            echo "OK: $ready pods running"
            exit 0
          fi
          sleep 10
        done
        echo "FAIL: no pods running after 5 min"
        exit 1

Three steps: snapshot the state, do the thing, verify the result. The verify step is what separates a remediation from a hope.

5. The Triage Service

The triage service is a Python service that subscribes to the EventBus, builds context, calls the LLM, and emits a typed event back onto the bus.

# triage/main.py
import asyncio
import json
import os
import nats
from anthropic import AsyncAnthropic
from pydantic import BaseModel, ValidationError

CATALOG = json.load(open("catalog/actions.json"))
ALLOWED = {a["name"] for a in CATALOG["actions"]}

class Action(BaseModel):
    name: str
    params: dict[str, str]
    rationale: str

class Decision(BaseModel):
    incident_id: str
    actions: list[Action]
    confidence: float
    needs_human: bool

SYSTEM = f"""You are a remediation planner. Pick zero or more actions from this catalog:
{json.dumps(CATALOG, indent=2)}
Return JSON matching the Decision schema. Never invent action names. Set
needs_human=true if you're below 0.7 confidence or the situation is novel."""

claude = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

async def plan(event: dict) -> Decision:
    msg = await claude.messages.create(
        model="claude-3-7-sonnet-20250219",
        max_tokens=1024,
        system=SYSTEM,
        messages=[{"role": "user", "content": json.dumps(event)}],
    )
    decision = Decision.model_validate_json(msg.content[0].text)
    for a in decision.actions:
        if a.name not in ALLOWED:
            raise ValueError(f"unknown action: {a.name}")
    return decision

async def main():
    nc = await nats.connect("nats://eventbus-default-stan-svc.argo-events:4222")
    sub = await nc.subscribe("alerts.>")
    async for msg in sub.messages:
        event = json.loads(msg.data)
        try:
            decision = await plan(event)
        except (ValidationError, ValueError) as e:
            await nc.publish("remediation.escalate", json.dumps({
                "incident_id": event["incident_id"],
                "reason": str(e),
            }).encode())
            continue
        if decision.needs_human or decision.confidence < 0.7:
            await nc.publish("remediation.approve", decision.model_dump_json().encode())
        else:
            await nc.publish("remediation.execute", decision.model_dump_json().encode())

asyncio.run(main())

Notice what’s not in this code: no kubectl, no direct API calls, no string interpolation into shell commands. The triage service is a planner, full stop.

6. Sensors That Match Decisions to Workflows

The Sensor is where decisions become workflow runs. One Sensor per execution path.

# sensor-execute.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: remediation-execute
  namespace: argo-events
spec:
  dependencies:
    - name: decision
      eventSourceName: eventbus
      eventName: remediation.execute
  triggers:
    - template:
        name: dispatch
        argoWorkflow:
          operation: submit
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: remediation-
              spec:
                workflowTemplateRef:
                  name: "{{ .Input.body.actions[0].template }}"
                arguments:
                  parameters:
                    - name: incident_id
                      value: "{{ .Input.body.incident_id }}"
                    - name: namespace
                      value: "{{ .Input.body.actions[0].params.namespace }}"
                    - name: selector
                      value: "{{ .Input.body.actions[0].params.selector }}"

The Sensor for remediation.approve is identical except it posts to Slack first and only submits the workflow on button click. I won’t show the Slack integration here, but it’s a standard webhook trigger with a 30-minute approval TTL.

7. Idempotency and the Audit Log

Every workflow writes to an immutable audit log. We use an append-only Postgres table with a hash chain.

CREATE TABLE remediation_audit (
  id BIGSERIAL PRIMARY KEY,
  incident_id TEXT NOT NULL,
  action TEXT NOT NULL,
  params JSONB NOT NULL,
  result TEXT NOT NULL,
  prev_hash BYTEA,
  this_hash BYTEA NOT NULL,
  created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX ON remediation_audit (incident_id);

The workflow’s final step posts the record. The this_hash is SHA256(prev_hash || row_json). If anyone tampers with a row, the chain breaks at audit time.

For idempotency, every workflow checks Redis for a key before starting.

# in the workflow entrypoint
KEY="remediation:${INCIDENT_ID}:${ACTION_NAME}"
if redis-cli SET "$KEY" "running" NX EX 1800 | grep -q OK; then
  echo "proceeding"
else
  echo "already in progress, exiting"
  exit 0
fi

SET NX EX 1800 is the whole pattern. 30-minute TTL covers most remediation windows and self-clears if a workflow dies.

8. Common Pitfalls

Four mistakes worth avoiding.

  1. Letting the model emit free-form parameters. Always validate against a pydantic schema with constraints. A selector of app=frontend is fine, a selector of --all-namespaces=true is not.
  2. Forgetting to suppress remediations during a deploy. If your CI marks a deployment as in-progress, your Sensor should refuse to act on alerts from that service for 10 minutes. Otherwise you’ll roll back your own deploy.
  3. No bounded concurrency. Argo Workflows has parallelism at the workflow level. Set it. A noisy alert storm shouldn’t spawn 200 simultaneous rollbacks.
  4. Treating the LLM’s confidence as calibrated. It isn’t. Use it as a soft signal alongside hard rules like “rollbacks always need approval”.

9. Troubleshooting

Three failures you’ll hit in week one.

9.1 Sensor not firing

Check kubectl -n argo-events logs deploy/sensor-controller-manager and the Sensor’s status conditions. Nine times out of ten the dependency name in the trigger template doesn’t match the dependency name in the dependencies list. Argo Events doesn’t shout when this is wrong.

9.2 Workflow stuck in Pending

The most common cause is a missing RBAC binding for remediation-runner. The ServiceAccount needs pods/delete and deployments/patch at minimum. Don’t grant cluster-admin. Pin the verbs.

9.3 LLM returns invalid JSON

Happens about 1 in 200 calls even with structured prompts. Wrap the parse in a try/except and emit a remediation.escalate event on failure. Don’t retry the model call automatically, the second response is usually no better than the first and you’ve doubled your cost.

10. Wrapping Up

The trick to safe auto-remediation isn’t smarter models, it’s smaller surface area. A small action catalog, a small triage service, a small set of workflows. The LLM is just a router from “alert” to “action name”. Argo Events and Argo Workflows do the rest with the same primitives your team already operates.

If you’re building this from scratch, start with one action — usually restart_pods — and let it run for two weeks before adding a second. You’ll find your gaps faster that way. Pair this with the higher-level architecture in AIOps in May 2025, what actually works in production, and review the Argo Events documentation for the bits I glossed over.