spot_img

How a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and Produces Production-Grade Incident Reviews End-to-End

Date:

- Advertisement -spot_img
- Advertisement -spot_img


@tool
def sql_investigate(query: str) -> dict:
try:
df = con.execute(query).df()
head = df.head(30)
return {
“rows”: int(len(df)),
“columns”: list(df.columns),
“preview”: head.to_dict(orient=”records”)
}
except Exception as e:
return {“error”: str(e)}

@tool
def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:
ws = pd.to_datetime(window_start_iso)
we = pd.to_datetime(window_end_iso)
df = logs_df[(logs_df[“ts”] >= ws) & (logs_df[“ts”] <= we)].copy()
if df.empty:
return {“rows”: 0, “top_error_kinds”: [], “top_services”: [], “top_endpoints”: []}
df[“error_kind_norm”] = df[“error_kind”].fillna(“”).replace(“”, “NONE”)
err = df[df[“level”].isin([“WARN”,”ERROR”])].copy()
top_err = err[“error_kind_norm”].value_counts().head(int(top_k)).to_dict()
top_svc = err[“service”].value_counts().head(int(top_k)).to_dict()
top_ep = err[“endpoint”].value_counts().head(int(top_k)).to_dict()
by_region = err.groupby(“region”).size().sort_values(ascending=False).head(int(top_k)).to_dict()
p95_latency = float(np.percentile(df[“latency_ms”].values, 95))
return {
“rows”: int(len(df)),
“warn_error_rows”: int(len(err)),
“p95_latency_ms”: p95_latency,
“top_error_kinds”: top_err,
“top_services”: top_svc,
“top_endpoints”: top_ep,
“error_by_region”: by_region
}

@tool
def propose_mitigations(hypothesis: str) -> dict:
h = hypothesis.lower()
mitigations = []
if “conn” in h or “pool” in h or “db” in h:
mitigations += [
{“action”: “Increase DB connection pool size (bounded) and add backpressure at db-proxy”, “owner”: “Platform”, “eta_days”: 3},
{“action”: “Add circuit breaker + adaptive timeouts between api-gateway and db-proxy”, “owner”: “Backend”, “eta_days”: 5},
{“action”: “Tune query hotspots; add indexes for top offending endpoints”, “owner”: “Data/DBA”, “eta_days”: 7},
]
if “timeout” in h or “upstream” in h:
mitigations += [
{“action”: “Implement hedged requests for idempotent calls (carefully) and tighten retry budgets”, “owner”: “Backend”, “eta_days”: 6},
{“action”: “Add upstream SLO-aware load shedding at api-gateway”, “owner”: “Platform”, “eta_days”: 7},
]
if “cache” in h:
mitigations += [
{“action”: “Add request coalescing and negative caching to prevent cache-miss storms”, “owner”: “Backend”, “eta_days”: 6},
{“action”: “Prewarm cache for top endpoints during deploys”, “owner”: “SRE”, “eta_days”: 4},
]
if not mitigations:
mitigations += [
{“action”: “Add targeted dashboards and alerts for the suspected bottleneck metric”, “owner”: “SRE”, “eta_days”: 3},
{“action”: “Run controlled load test to reproduce and validate the hypothesis”, “owner”: “Perf Eng”, “eta_days”: 5},
]
mitigations = mitigations[:10]
return {“hypothesis”: hypothesis, “mitigations”: mitigations}

@tool
def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:
try:
facts = json.loads(key_facts_json)
except Exception:
facts = {“note”: “key_facts_json was not valid JSON”}
try:
mits = json.loads(mitigations_json)
except Exception:
mits = {“note”: “mitigations_json was not valid JSON”}
doc = {
“title”: title,
“date_utc”: datetime.utcnow().strftime(“%Y-%m-%d”),
“incident_window_utc”: {“start”: window_start_iso, “end”: window_end_iso},
“customer_impact”: customer_impact,
“suspected_root_cause”: suspected_root_cause,
“detection”: {
“how_detected”: “Automated anomaly detection + error-rate spike triage”,
“gaps”: [“Add earlier saturation alerting”, “Improve symptom-to-cause correlation dashboards”]
},
“timeline”: [
{“t”: window_start_iso, “event”: “Symptoms begin (latency/error anomalies)”},
{“t”: “T+10m”, “event”: “On-call begins triage; identifies top services/endpoints”},
{“t”: “T+25m”, “event”: “Mitigation actions initiated (throttling/backpressure)”},
{“t”: window_end_iso, “event”: “Customer impact ends; metrics stabilize”},
],
“key_facts”: facts,
“corrective_actions”: mits.get(“mitigations”, mits),
“followups”: [
{“area”: “Reliability”, “task”: “Add saturation signals + budget-based retries”, “priority”: “P1”},
{“area”: “Observability”, “task”: “Add golden signals per service/endpoint”, “priority”: “P1”},
{“area”: “Performance”, “task”: “Reproduce with load test and validate fix”, “priority”: “P2”},
],
“appendix”: {“notes”: “Generated by a Haystack multi-agent workflow (non-RAG).”}
}
return {“postmortem_json”: doc}

llm = OpenAIChatGenerator(model=”gpt-4o-mini”)

- Advertisement -spot_img

state_schema = {
“metrics_csv_path”: {“type”: str},
“logs_csv_path”: {“type”: str},
“metrics_summary”: {“type”: dict},
“logs_summary”: {“type”: dict},
“incident_window”: {“type”: dict},
“investigation_notes”: {“type”: list, “handler”: merge_lists},
“hypothesis”: {“type”: str},
“key_facts”: {“type”: dict},
“mitigation_plan”: {“type”: dict},
“postmortem”: {“type”: dict},
}

profiler_prompt = “””You are a specialist incident profiler.
Goal: turn raw metrics/log summaries into crisp, high-signal findings.
Rules:
– Prefer calling tools over guessing.
– Output must be a JSON object with keys: window, symptoms, top_contributors, hypothesis, key_facts.
– Hypothesis must be falsifiable and mention at least one specific service and mechanism.
“””

writer_prompt = “””You are a specialist postmortem writer.
Goal: produce a high-quality postmortem JSON (not prose) using the provided evidence and mitigation plan.
Rules:
– Call tools only if needed.
– Keep ‘suspected_root_cause’ specific and not generic.
– Ensure corrective actions have owners and eta_days.
“””

coordinator_prompt = “””You are an incident commander coordinating a non-RAG multi-agent workflow.
You must:
1) Load inputs
2) Find an incident window (use p95_ms or error_rate)
3) Investigate with targeted SQL and log pattern scan
4) Ask the specialist profiler to synthesize evidence
5) Propose mitigations
6) Ask the specialist writer to draft a postmortem JSON
Return a final response with:
– A short executive summary (max 10 lines)
– The postmortem JSON
– A compact runbook checklist (bulleted)
“””

profiler_agent = Agent(
chat_generator=llm,
tools=[load_inputs, detect_incident_window, sql_investigate, log_pattern_scan],
system_prompt=profiler_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

writer_agent = Agent(
chat_generator=llm,
tools=[draft_postmortem],
system_prompt=writer_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

profiler_tool = ComponentTool(
component=profiler_agent,
name=”profiler_specialist”,
description=”Synthesizes incident evidence into a falsifiable hypothesis and key facts (JSON output).”,
outputs_to_string={“source”: “last_message”}
)

writer_tool = ComponentTool(
component=writer_agent,
name=”postmortem_writer_specialist”,
description=”Drafts a postmortem JSON using title/window/impact/rca/facts/mitigations.”,
outputs_to_string={“source”: “last_message”}
)

coordinator_agent = Agent(
chat_generator=llm,
tools=[
load_inputs,
detect_incident_window,
sql_investigate,
log_pattern_scan,
propose_mitigations,
profiler_tool,
writer_tool,
draft_postmortem
],
system_prompt=coordinator_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)



Source link

- Advertisement -spot_img

LEAVE A REPLY

Please enter your comment!
Please enter your name here

51 − 49 =
Powered by MathCaptcha

Share post:

Subscribe

spot_img

Popular

More like this
Related

Crypto Victory Ahead? This Senator’s Decision Clears Path For Market Structure Bill Approval

Trusted Editorial content, reviewed by leading industry experts...

Russia Advances Crypto Seizure Bill In New Regulatory Push

Trusted Editorial content, reviewed by leading industry experts...

Enterprise AI adoption shifts to agentic systems

According to Databricks, enterprise AI adoption is shifting...

Stocks making the biggest moves after hours: TXN, STX, QRVO

Check out the companies making headlines in after-hours...