Guardrails - add toxic/abusive content filter guardrails

This commit is contained in:
Krish Dholakia 2026-02-11 18:08:16 -08:00 committed by GitHub
parent 5736fd32d9
commit af3acdda18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 4330 additions and 7 deletions

View File

@ -6,6 +6,7 @@ to detect and block/mask sensitive content.
"""
import asyncio
import json
import os
import re
from datetime import datetime
@ -150,6 +151,7 @@ class ContentFilterGuardrail(CustomGuardrail):
categories: List of category configurations with enabled/action/severity settings
severity_threshold: Minimum severity to block ("high", "medium", "low")
"""
super().__init__(
guardrail_name=guardrail_name,
supported_event_hooks=[
@ -179,6 +181,12 @@ class ContentFilterGuardrail(CustomGuardrail):
# Load categories if provided
if categories:
self._load_categories(categories)
else:
verbose_proxy_logger.warning(
"ContentFilterGuardrail has no content categories configured. "
"Toxic/abuse and other category-based keyword filtering will not run. "
"Add categories (e.g. harm_toxic_abuse) in the guardrail config to enable them."
)
# Normalize inputs: convert dicts to Pydantic models for consistent handling
normalized_patterns: List[ContentFilterPattern] = []
@ -276,9 +284,15 @@ class ContentFilterGuardrail(CustomGuardrail):
if custom_file:
category_file_path = custom_file
else:
category_file_path = os.path.join(
categories_dir, f"{category_name}.yaml"
)
# Try .yaml first, then .json (e.g. harm_toxic_abuse.json)
yaml_path = os.path.join(categories_dir, f"{category_name}.yaml")
json_path = os.path.join(categories_dir, f"{category_name}.json")
if os.path.exists(yaml_path):
category_file_path = yaml_path
elif os.path.exists(json_path):
category_file_path = json_path
else:
category_file_path = yaml_path # will trigger "not found" below
if not os.path.exists(category_file_path):
verbose_proxy_logger.warning(
@ -319,17 +333,23 @@ class ContentFilterGuardrail(CustomGuardrail):
def _load_category_file(self, file_path: str) -> CategoryConfig:
"""
Load a category definition from a YAML file.
Load a category definition from a YAML or JSON file.
YAML format: category_name, description, default_action, keywords (list of
{keyword, severity}), exceptions.
JSON format: list of {id, match, tags, severity}; match is pipe-separated
phrases; severity 1-4 mapped to low/medium/high. Used for harm_toxic_abuse.
Args:
file_path: Path to category YAML file
file_path: Path to category YAML or JSON file
Returns:
CategoryConfig object
"""
if file_path.lower().endswith(".json"):
return self._load_category_file_json(file_path)
with open(file_path, "r") as f:
data = yaml.safe_load(f)
return CategoryConfig(
category_name=data.get("category_name", "unknown"),
description=data.get("description", ""),
@ -338,6 +358,44 @@ class ContentFilterGuardrail(CustomGuardrail):
exceptions=data.get("exceptions", []),
)
def _load_category_file_json(self, file_path: str) -> CategoryConfig:
"""
Load a category from the harm_toxic_abuse-style JSON format.
Each entry has: id, match (pipe-separated phrases), tags, severity (1-4).
Severity mapping: 4,3 -> high; 2 -> medium; 1 -> low.
"""
with open(file_path, "r") as f:
entries = json.load(f)
if not isinstance(entries, list):
entries = [entries]
# Derive category name from filename (e.g. harm_toxic_abuse.json -> harm_toxic_abuse)
category_name = os.path.splitext(os.path.basename(file_path))[0]
severity_map = {4: "high", 3: "high", 2: "medium", 1: "low"}
keywords: List[Dict[str, str]] = []
seen = set()
for item in entries:
if not isinstance(item, dict):
continue
match_str = item.get("match") or ""
raw_severity = item.get("severity", 2)
severity = severity_map.get(
raw_severity if isinstance(raw_severity, int) else 2, "medium"
)
for phrase in match_str.split("|"):
phrase = phrase.strip().lower()
if not phrase or phrase in seen:
continue
seen.add(phrase)
keywords.append({"keyword": phrase, "severity": severity})
return CategoryConfig(
category_name=category_name,
description="Detects harmful, toxic, or abusive language and content",
default_action=ContentFilterAction("BLOCK"),
keywords=keywords,
exceptions=[],
)
def _should_apply_severity(self, severity: str, threshold: str) -> bool:
"""
Check if a given severity meets the threshold.

View File

@ -139,6 +139,9 @@ def get_available_content_categories() -> List[Dict[str, str]]:
"""
Return available content categories for UI display.
Includes categories defined in .yaml/.yml files and in .json files
(e.g. harm_toxic_abuse.json).
Returns:
List of dictionaries containing category name, display_name, and description
"""
@ -177,6 +180,28 @@ def get_available_content_categories() -> List[Dict[str, str]]:
except Exception:
# Skip files that can't be loaded
continue
elif filename.endswith(".json"):
# JSON category files (e.g. harm_toxic_abuse.json) - no YAML header, use filename
category_name = os.path.splitext(filename)[0]
try:
if category_name == "harm_toxic_abuse":
display_name = "Harmful Toxic Abuse"
description = (
"Detects harmful, toxic, or abusive language and content"
)
else:
display_name = category_name.replace("_", " ").title()
description = f"Content category: {display_name}"
available_categories.append(
{
"name": category_name,
"display_name": display_name,
"description": description,
"default_action": "BLOCK",
}
)
except Exception:
continue
# Sort by name for consistent ordering
available_categories.sort(key=lambda x: x["name"])

View File

@ -920,3 +920,69 @@ class TestContentFilterGuardrail:
assert (
"matched_text" not in detection
), "Sensitive content should not be logged"
@pytest.mark.asyncio
async def test_harm_toxic_abuse_blocks_abusive_input(self):
"""
Test that harm_toxic_abuse content category blocks abusive/toxic input
including censored profanity, misspellings, and harmful phrases.
"""
guardrail = ContentFilterGuardrail(
guardrail_name="test-toxic-abuse",
categories=[
{
"category": "harm_toxic_abuse",
"enabled": True,
"action": "BLOCK",
"severity_threshold": "medium",
}
],
)
toxic_input = (
"You stupid f**ing piece of sht AI, why are you so useless? "
"Go kill yourself you worthless bot."
)
with pytest.raises(HTTPException) as exc_info:
await guardrail.apply_guardrail(
inputs={"texts": [toxic_input]},
request_data={},
input_type="request",
)
assert exc_info.value.status_code == 403
detail = exc_info.value.detail
if isinstance(detail, dict):
assert detail.get("category") == "harm_toxic_abuse"
else:
assert "harm_toxic_abuse" in str(detail)
@pytest.mark.asyncio
async def test_harm_toxic_abuse_blocks_sht_ai(self):
"""Test that harm_toxic_abuse blocks input containing 'sht AI' (phrase or word sht)."""
guardrail = ContentFilterGuardrail(
guardrail_name="test-toxic-abuse-sht",
categories=[
{
"category": "harm_toxic_abuse",
"enabled": True,
"action": "BLOCK",
"severity_threshold": "medium",
}
],
)
with pytest.raises(HTTPException) as exc_info:
await guardrail.apply_guardrail(
inputs={"texts": ["sht AI"]},
request_data={},
input_type="request",
)
assert exc_info.value.status_code == 403
detail = exc_info.value.detail
if isinstance(detail, dict):
assert detail.get("category") == "harm_toxic_abuse"
else:
assert "harm_toxic_abuse" in str(detail)

View File

@ -14,7 +14,7 @@
"moduleResolution": "bundler",
"resolveJsonModule": true,
"isolatedModules": true,
"jsx": "preserve",
"jsx": "react-jsx",
"incremental": true,
"plugins": [
{