Compare commits

...

5 Commits

578 changed files with 5489 additions and 24659 deletions

View File

@ -1,37 +0,0 @@
The BerriAI Enterprise license (the "Enterprise License")
Copyright (c) 2024 - present Berrie AI Inc.
With regard to the BerriAI Software:
This software and associated documentation files (the "Software") may only be
used in production, if you (and any entity that you represent) have agreed to,
and are in compliance with, the BerriAI Subscription Terms of Service, available
via [call](https://enterprise.litellm.ai/demo) or email (info@berri.ai) (the "Enterprise Terms"), or other
agreement governing the use of the Software, as agreed by you and BerriAI,
and otherwise have a valid BerriAI Enterprise license for the
correct number of user seats. Subject to the foregoing sentence, you are free to
modify this Software and publish patches to the Software. You agree that BerriAI
and/or its licensors (as applicable) retain all right, title and interest in and
to all such modifications and/or patches, and all such modifications and/or
patches may only be used, copied, modified, displayed, distributed, or otherwise
exploited with a valid BerriAI Enterprise license for the correct
number of user seats. Notwithstanding the foregoing, you may copy and modify
the Software for development and testing purposes, without requiring a
subscription. You agree that BerriAI and/or its licensors (as applicable) retain
all right, title and interest in and to all such modifications. You are not
granted any other rights beyond what is expressly stated herein. Subject to the
foregoing, it is forbidden to copy, merge, publish, distribute, sublicense,
and/or sell the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
For all third party components incorporated into the BerriAI Software, those
components are licensed under the original license provided by the owner of the
applicable component.

View File

@ -1,9 +0,0 @@
## LiteLLM Enterprise
Code in this folder is licensed under a commercial license. Please review the [LICENSE](./LICENSE.md) file within the /enterprise folder
**These features are covered under the LiteLLM Enterprise contract**
👉 **Using in an Enterprise / Need specific features ?** Meet with us [here](https://enterprise.litellm.ai/demo?month=2024-02)
See all Enterprise Features here 👉 [Docs](https://docs.litellm.ai/docs/proxy/enterprise)

View File

@ -1 +0,0 @@
from . import *

View File

@ -1,44 +0,0 @@
Resources:
LiteLLMServer:
Type: AWS::EC2::Instance
Properties:
AvailabilityZone: us-east-1a
ImageId: ami-0f403e3180720dd7e
InstanceType: t2.micro
LiteLLMServerAutoScalingGroup:
Type: AWS::AutoScaling::AutoScalingGroup
Properties:
AvailabilityZones:
- us-east-1a
LaunchConfigurationName: !Ref LiteLLMServerLaunchConfig
MinSize: 1
MaxSize: 3
DesiredCapacity: 1
HealthCheckGracePeriod: 300
LiteLLMServerLaunchConfig:
Type: AWS::AutoScaling::LaunchConfiguration
Properties:
ImageId: ami-0f403e3180720dd7e # Replace with your desired AMI ID
InstanceType: t2.micro
LiteLLMServerScalingPolicy:
Type: AWS::AutoScaling::ScalingPolicy
Properties:
AutoScalingGroupName: !Ref LiteLLMServerAutoScalingGroup
PolicyType: TargetTrackingScaling
TargetTrackingConfiguration:
PredefinedMetricSpecification:
PredefinedMetricType: ASGAverageCPUUtilization
TargetValue: 60.0
LiteLLMDB:
Type: AWS::RDS::DBInstance
Properties:
AllocatedStorage: 20
Engine: postgres
MasterUsername: litellmAdmin
MasterUserPassword: litellmPassword
DBInstanceClass: db.t3.micro
AvailabilityZone: us-east-1a

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -1,33 +0,0 @@
from typing import Dict, Literal, Type, Union
from litellm_enterprise.proxy.hooks.managed_files import _PROXY_LiteLLMManagedFiles
from litellm_enterprise.proxy.hooks.managed_vector_stores import (
_PROXY_LiteLLMManagedVectorStores,
)
from litellm.integrations.custom_logger import CustomLogger
ENTERPRISE_PROXY_HOOKS: Dict[str, Type[CustomLogger]] = {
"managed_files": _PROXY_LiteLLMManagedFiles,
"managed_vector_stores": _PROXY_LiteLLMManagedVectorStores,
}
def get_enterprise_proxy_hook(
hook_name: Union[
Literal[
"managed_files",
"managed_vector_stores",
"max_parallel_requests",
],
str,
],
):
"""
Factory method to get a enterprise hook instance by name
"""
if hook_name not in ENTERPRISE_PROXY_HOOKS:
raise ValueError(
f"Unknown hook: {hook_name}. Available hooks: {list(ENTERPRISE_PROXY_HOOKS.keys())}"
)
return ENTERPRISE_PROXY_HOOKS[hook_name]

View File

@ -1,204 +0,0 @@
# +-------------------------------------------------------------+
#
# Use AporiaAI for your LLM calls
#
# +-------------------------------------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
import os
import sys
from litellm.types.utils import CallTypesLiteral
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import json
import sys
from typing import Any, List, Literal, Optional
from fastapi import HTTPException
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.litellm_core_utils.logging_utils import (
convert_litellm_response_object_to_str,
)
from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy.guardrails.guardrail_helpers import should_proceed_based_on_metadata
from litellm.types.guardrails import GuardrailEventHooks
GUARDRAIL_NAME = "aporia"
class AporiaGuardrail(CustomGuardrail):
def __init__(
self, api_key: Optional[str] = None, api_base: Optional[str] = None, **kwargs
):
self.async_handler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.GuardrailCallback
)
self.aporia_api_key = api_key or os.environ["APORIO_API_KEY"]
self.aporia_api_base = api_base or os.environ["APORIO_API_BASE"]
super().__init__(**kwargs)
#### CALL HOOKS - proxy only ####
def transform_messages(self, messages: List[dict]) -> List[dict]:
supported_openai_roles = ["system", "user", "assistant"]
default_role = "other" # for unsupported roles - e.g. tool
new_messages = []
for m in messages:
if m.get("role", "") in supported_openai_roles:
new_messages.append(m)
else:
new_messages.append(
{
"role": default_role,
**{key: value for key, value in m.items() if key != "role"},
}
)
return new_messages
async def prepare_aporia_request(
self, new_messages: List[dict], response_string: Optional[str] = None
) -> dict:
data: dict[str, Any] = {}
if new_messages is not None:
data["messages"] = new_messages
if response_string is not None:
data["response"] = response_string
# Set validation target
if new_messages and response_string:
data["validation_target"] = "both"
elif new_messages:
data["validation_target"] = "prompt"
elif response_string:
data["validation_target"] = "response"
verbose_proxy_logger.debug("Aporia AI request: %s", data)
return data
async def make_aporia_api_request(
self, new_messages: List[dict], response_string: Optional[str] = None
):
data = await self.prepare_aporia_request(
new_messages=new_messages, response_string=response_string
)
_json_data = json.dumps(data)
"""
export APORIO_API_KEY=<your key>
curl https://gr-prd-trial.aporia.com/some-id \
-X POST \
-H "X-APORIA-API-KEY: $APORIO_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{
"role": "user",
"content": "This is a test prompt"
}
],
}
'
"""
response = await self.async_handler.post(
url=self.aporia_api_base + "/validate",
data=_json_data,
headers={
"X-APORIA-API-KEY": self.aporia_api_key,
"Content-Type": "application/json",
},
)
verbose_proxy_logger.debug("Aporia AI response: %s", response.text)
if response.status_code == 200:
# check if the response was flagged
_json_response = response.json()
action: str = _json_response.get(
"action"
) # possible values are modify, passthrough, block, rephrase
if action == "block":
raise HTTPException(
status_code=400,
detail={
"error": "Violated guardrail policy",
"aporia_ai_response": _json_response,
},
)
async def async_post_call_success_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
response,
):
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
"""
Use this for the post call moderation with Guardrails
"""
event_type: GuardrailEventHooks = GuardrailEventHooks.post_call
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
return
response_str: Optional[str] = convert_litellm_response_object_to_str(response)
if response_str is not None:
await self.make_aporia_api_request(
response_string=response_str, new_messages=data.get("messages", [])
)
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
pass
async def async_moderation_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: CallTypesLiteral,
):
from litellm.proxy.common_utils.callback_utils import (
add_guardrail_to_applied_guardrails_header,
)
event_type: GuardrailEventHooks = GuardrailEventHooks.during_call
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
return
# old implementation - backwards compatibility
if (
await should_proceed_based_on_metadata(
data=data,
guardrail_name=GUARDRAIL_NAME,
)
is False
):
return
new_messages: Optional[List[dict]] = None
if "messages" in data and isinstance(data["messages"], list):
new_messages = self.transform_messages(messages=data["messages"])
if new_messages is not None:
await self.make_aporia_api_request(new_messages=new_messages)
add_guardrail_to_applied_guardrails_header(
request_data=data, guardrail_name=self.guardrail_name
)
else:
verbose_proxy_logger.warning(
"Aporia AI: not running guardrail. No messages in data"
)
pass

View File

@ -1,115 +0,0 @@
# +------------------------------+
#
# Banned Keywords
#
# +------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
## Reject a call / response if it contains certain keywords
from typing import Literal
import litellm
from litellm.caching.caching import DualCache
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy.guardrails._content_utils import (
is_text_content_call_type,
iter_message_text,
)
from litellm.integrations.custom_logger import CustomLogger
from litellm._logging import verbose_proxy_logger
from fastapi import HTTPException
class _ENTERPRISE_BannedKeywords(CustomLogger):
# Class variables or attributes
def __init__(self):
banned_keywords_list = litellm.banned_keywords_list
if banned_keywords_list is None:
raise Exception(
"`banned_keywords_list` can either be a list or filepath. None set."
)
if isinstance(banned_keywords_list, list):
self.banned_keywords_list = banned_keywords_list
if isinstance(banned_keywords_list, str): # assume it's a filepath
try:
with open(banned_keywords_list, "r") as file:
data = file.read()
self.banned_keywords_list = data.split("\n")
except FileNotFoundError:
raise Exception(
f"File not found. banned_keywords_list={banned_keywords_list}"
)
except Exception as e:
raise Exception(
f"An error occurred: {str(e)}, banned_keywords_list={banned_keywords_list}"
)
def print_verbose(self, print_statement, level: Literal["INFO", "DEBUG"] = "DEBUG"):
if level == "INFO":
verbose_proxy_logger.info(print_statement)
elif level == "DEBUG":
verbose_proxy_logger.debug(print_statement)
if litellm.set_verbose is True:
print(print_statement) # noqa
def test_violation(self, test_str: str):
for word in self.banned_keywords_list:
if word in test_str.lower():
raise HTTPException(
status_code=400,
detail={"error": f"Keyword banned. Keyword={word}"},
)
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: str, # "completion", "embeddings", "image_generation", "moderation"
):
try:
"""
- check if user id part of call
- check if user id part of blocked list
"""
self.print_verbose("Inside Banned Keyword List Pre-Call Hook")
if is_text_content_call_type(call_type):
for text in iter_message_text(data):
self.test_violation(test_str=text)
except HTTPException as e:
raise e
except Exception as e:
verbose_proxy_logger.exception(
"litellm.enterprise.enterprise_hooks.banned_keywords::async_pre_call_hook - Exception occurred - {}".format(
str(e)
)
)
async def async_post_call_success_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
response,
):
if not isinstance(response, litellm.ModelResponse):
return
for choice in response.choices:
if not isinstance(choice, litellm.utils.Choices):
continue
message = getattr(choice, "message", None)
content = getattr(message, "content", None)
if isinstance(content, str):
self.test_violation(test_str=content)
async def async_post_call_streaming_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
response: str,
):
self.test_violation(test_str=response)

View File

@ -1,124 +0,0 @@
# +------------------------------+
#
# Blocked User List
#
# +------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
## This accepts a list of user id's for whom calls will be rejected
from typing import Optional, Literal
import litellm
from litellm.proxy.utils import PrismaClient
from litellm.caching.caching import DualCache
from litellm.proxy._types import UserAPIKeyAuth, LiteLLM_EndUserTable
from litellm.integrations.custom_logger import CustomLogger
from litellm._logging import verbose_proxy_logger
from fastapi import HTTPException
class _ENTERPRISE_BlockedUserList(CustomLogger):
# Class variables or attributes
def __init__(self, prisma_client: Optional[PrismaClient]):
self.prisma_client = prisma_client
blocked_user_list = litellm.blocked_user_list
if blocked_user_list is None:
self.blocked_user_list = None
return
if isinstance(blocked_user_list, list):
self.blocked_user_list = blocked_user_list
if isinstance(blocked_user_list, str): # assume it's a filepath
try:
with open(blocked_user_list, "r") as file:
data = file.read()
self.blocked_user_list = data.split("\n")
except FileNotFoundError:
raise Exception(
f"File not found. blocked_user_list={blocked_user_list}"
)
except Exception as e:
raise Exception(
f"An error occurred: {str(e)}, blocked_user_list={blocked_user_list}"
)
def print_verbose(self, print_statement, level: Literal["INFO", "DEBUG"] = "DEBUG"):
if level == "INFO":
verbose_proxy_logger.info(print_statement)
elif level == "DEBUG":
verbose_proxy_logger.debug(print_statement)
if litellm.set_verbose is True:
print(print_statement) # noqa
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: str,
):
try:
"""
- check if user id part of call
- check if user id part of blocked list
- if blocked list is none or user not in blocked list
- check if end-user in cache
- check if end-user in db
"""
self.print_verbose("Inside Blocked User List Pre-Call Hook")
if "user_id" in data or "user" in data:
user = data.get("user_id", data.get("user", ""))
if (
self.blocked_user_list is not None
and user in self.blocked_user_list
):
raise HTTPException(
status_code=400,
detail={
"error": f"User blocked from making LLM API Calls. User={user}"
},
)
cache_key = f"litellm:end_user_id:{user}"
end_user_cache_obj: Optional[LiteLLM_EndUserTable] = cache.get_cache( # type: ignore
key=cache_key
)
if end_user_cache_obj is None and self.prisma_client is not None:
# check db
end_user_obj = (
await self.prisma_client.db.litellm_endusertable.find_unique(
where={"user_id": user}
)
)
if end_user_obj is None: # user not in db - assume not blocked
end_user_obj = LiteLLM_EndUserTable(user_id=user, blocked=False)
cache.set_cache(key=cache_key, value=end_user_obj, ttl=60)
if end_user_obj is not None and end_user_obj.blocked is True:
raise HTTPException(
status_code=400,
detail={
"error": f"User blocked from making LLM API Calls. User={user}"
},
)
elif (
end_user_cache_obj is not None
and end_user_cache_obj.blocked is True
):
raise HTTPException(
status_code=400,
detail={
"error": f"User blocked from making LLM API Calls. User={user}"
},
)
except HTTPException as e:
raise e
except Exception as e:
verbose_proxy_logger.exception(
"litellm.enterprise.enterprise_hooks.blocked_user_list::async_pre_call_hook - Exception occurred - {}".format(
str(e)
)
)

View File

@ -1,135 +0,0 @@
# +-----------------------------------------------+
#
# Google Text Moderation
# https://cloud.google.com/natural-language/docs/moderating-text
#
# +-----------------------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
from fastapi import HTTPException
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy.guardrails._content_utils import iter_message_text
from litellm.types.utils import CallTypesLiteral
class _ENTERPRISE_GoogleTextModeration(CustomLogger):
user_api_key_cache = None
confidence_categories = [
"toxic",
"insult",
"profanity",
"derogatory",
"sexual",
"death_harm_and_tragedy",
"violent",
"firearms_and_weapons",
"public_safety",
"health",
"religion_and_belief",
"illicit_drugs",
"war_and_conflict",
"politics",
"finance",
"legal",
] # https://cloud.google.com/natural-language/docs/moderating-text#safety_attribute_confidence_scores
# Class variables or attributes
def __init__(self):
try:
from google.cloud import language_v1 # type: ignore
except Exception:
raise Exception(
"Missing google.cloud package. Run `pip install --upgrade google-cloud-language`"
)
# Instantiates a client
self.client = language_v1.LanguageServiceClient()
self.moderate_text_request = language_v1.ModerateTextRequest
self.language_document = language_v1.types.Document # type: ignore
self.document_type = language_v1.types.Document.Type.PLAIN_TEXT # type: ignore
default_confidence_threshold = (
litellm.google_moderation_confidence_threshold or 0.8
) # by default require a high confidence (80%) to fail
for category in self.confidence_categories:
if hasattr(litellm, f"{category}_confidence_threshold"):
setattr(
self,
f"{category}_confidence_threshold",
getattr(litellm, f"{category}_confidence_threshold"),
)
else:
setattr(
self,
f"{category}_confidence_threshold",
default_confidence_threshold,
)
set_confidence_value = getattr(
self,
f"{category}_confidence_threshold",
)
verbose_proxy_logger.info(
f"Google Text Moderation: {category}_confidence_threshold: {set_confidence_value}"
)
def print_verbose(self, print_statement):
try:
verbose_proxy_logger.debug(print_statement)
if litellm.set_verbose:
print(print_statement) # noqa
except Exception:
pass
async def async_moderation_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: CallTypesLiteral,
):
"""
- Calls Google's Text Moderation API
- Rejects request if it fails safety check
"""
# Covers multimodal list content + Responses-API input.
text = "".join(iter_message_text(data))
if text:
document = self.language_document(content=text, type_=self.document_type)
request = self.moderate_text_request(
document=document,
)
# Make the request
response = self.client.moderate_text(request=request)
for category in response.moderation_categories:
category_name = category.name
category_name = category_name.lower()
category_name = category_name.replace("&", "and")
category_name = category_name.replace(",", "")
category_name = category_name.replace(
" ", "_"
) # e.g. go from 'Firearms & Weapons' to 'firearms_and_weapons'
if category.confidence > getattr(
self, f"{category_name}_confidence_threshold"
):
raise HTTPException(
status_code=400,
detail={
"error": f"Violated content safety policy. Category={category}"
},
)
# Handle the response
return data
# google_text_moderation_obj = _ENTERPRISE_GoogleTextModeration()
# asyncio.run(
# google_text_moderation_obj.async_moderation_hook(
# data={"messages": [{"role": "user", "content": "Hey, how's it going?"}]}
# )
# )

View File

@ -1,58 +0,0 @@
# +-------------------------------------------------------------+
#
# Use OpenAI /moderations for your LLM calls
#
# +-------------------------------------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
import os
import sys
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import sys
from fastapi import HTTPException
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy.guardrails._content_utils import iter_message_text
from litellm.types.utils import CallTypesLiteral
class _ENTERPRISE_OpenAI_Moderation(CustomLogger):
def __init__(self):
self.model_name = (
litellm.openai_moderations_model_name or "text-moderation-latest"
) # pass the model_name you initialized on litellm.Router()
pass
#### CALL HOOKS - proxy only ####
async def async_moderation_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: CallTypesLiteral,
):
# Covers multimodal list content + Responses-API input.
text = "".join(iter_message_text(data))
from litellm.proxy.proxy_server import llm_router
if llm_router is None:
return
moderation_response = await llm_router.amoderation(
model=self.model_name, input=text
)
verbose_proxy_logger.debug("Moderation response: %s", moderation_response)
if moderation_response and moderation_response.results[0].flagged is True:
raise HTTPException(
status_code=403, detail={"error": "Violated content safety policy"}
)
pass

View File

@ -1,6 +0,0 @@
## Admin UI
Customize the Admin UI to your companies branding / logo
![Group 204](https://github.com/BerriAI/litellm/assets/29436595/3b7dbfc2-6fcd-42af-996d-f734fb8f461b)
## Docs to set up Custom Admin UI [here](https://docs.litellm.ai/docs/proxy/ui)

View File

@ -1,11 +0,0 @@
{
"brand": {
"DEFAULT": "teal",
"faint": "teal",
"muted": "teal",
"subtle": "teal",
"emphasis": "teal",
"inverted": "teal"
}
}

View File

@ -1,99 +0,0 @@
from typing import List, Optional
import litellm
from litellm._logging import verbose_logger
from litellm.constants import X_LITELLM_DISABLE_CALLBACKS
from litellm.integrations.custom_logger import CustomLogger
from litellm.litellm_core_utils.llm_request_utils import (
get_proxy_server_request_headers,
)
from litellm.proxy._types import CommonProxyErrors
from litellm.types.utils import StandardCallbackDynamicParams
class EnterpriseCallbackControls:
@staticmethod
def is_callback_disabled_dynamically(
callback: litellm.CALLBACK_TYPES,
litellm_params: dict,
standard_callback_dynamic_params: StandardCallbackDynamicParams
) -> bool:
"""
Check if a callback is disabled via the x-litellm-disable-callbacks header or via `litellm_disabled_callbacks` in standard_callback_dynamic_params.
Args:
callback: The callback to check (can be string, CustomLogger instance, or callable)
litellm_params: Parameters containing proxy server request info
Returns:
bool: True if the callback should be disabled, False otherwise
"""
from litellm.litellm_core_utils.custom_logger_registry import (
CustomLoggerRegistry,
)
try:
disabled_callbacks = EnterpriseCallbackControls.get_disabled_callbacks(litellm_params, standard_callback_dynamic_params)
verbose_logger.debug(f"Dynamically disabled callbacks from {X_LITELLM_DISABLE_CALLBACKS}: {disabled_callbacks}")
verbose_logger.debug(f"Checking if {callback} is disabled via headers. Disable callbacks from headers: {disabled_callbacks}")
if disabled_callbacks is not None:
#########################################################
# premium user check
#########################################################
if not EnterpriseCallbackControls._should_allow_dynamic_callback_disabling():
return False
#########################################################
if isinstance(callback, str):
if callback.lower() in disabled_callbacks:
verbose_logger.debug(f"Not logging to {callback} because it is disabled via {X_LITELLM_DISABLE_CALLBACKS}")
return True
elif isinstance(callback, CustomLogger):
# get the string name of the callback
callback_str = CustomLoggerRegistry.get_callback_str_from_class_type(callback.__class__)
if callback_str is not None and callback_str.lower() in disabled_callbacks:
verbose_logger.debug(f"Not logging to {callback_str} because it is disabled via {X_LITELLM_DISABLE_CALLBACKS}")
return True
return False
except Exception as e:
verbose_logger.debug(
f"Error checking disabled callbacks header: {str(e)}"
)
return False
@staticmethod
def get_disabled_callbacks(litellm_params: dict, standard_callback_dynamic_params: StandardCallbackDynamicParams) -> Optional[List[str]]:
"""
Get the disabled callbacks from the standard callback dynamic params.
"""
#########################################################
# check if disabled via headers
#########################################################
request_headers = get_proxy_server_request_headers(litellm_params)
disabled_callbacks = request_headers.get(X_LITELLM_DISABLE_CALLBACKS, None)
if disabled_callbacks is not None:
disabled_callbacks = set([cb.strip().lower() for cb in disabled_callbacks.split(",")])
return list(disabled_callbacks)
#########################################################
# check if disabled via request body
#########################################################
if standard_callback_dynamic_params.get("litellm_disabled_callbacks", None) is not None:
return standard_callback_dynamic_params.get("litellm_disabled_callbacks", None)
return None
@staticmethod
def _should_allow_dynamic_callback_disabling():
import litellm
from litellm.proxy.proxy_server import premium_user
# Check if admin has disabled this feature
if litellm.allow_dynamic_callback_disabling is not True:
verbose_logger.debug("Dynamic callback disabling is disabled by admin via litellm.allow_dynamic_callback_disabling")
return False
if premium_user:
return True
verbose_logger.warning(f"Disabling callbacks using request headers is an enterprise feature. {CommonProxyErrors.not_premium_user.value}")
return False

View File

@ -1,27 +0,0 @@
# this is an example endpoint to receive data from litellm
from fastapi import FastAPI, HTTPException, Request
app = FastAPI()
@app.post("/log-event")
async def log_event(request: Request):
try:
print("Received /log-event request") # noqa
# Assuming the incoming request has JSON data
data = await request.json()
print("Received request data:") # noqa
print(data) # noqa
# Your additional logic can go here
# For now, just printing the received data
return {"message": "Request received successfully"}
except Exception:
raise HTTPException(status_code=500, detail="Internal Server Error")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)

View File

@ -1,130 +0,0 @@
# +-------------------------------------------------------------+
#
# Llama Guard
# https://huggingface.co/meta-llama/LlamaGuard-7b/tree/main
#
# LLM for Content Moderation
# +-------------------------------------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
import os
import sys
from collections.abc import Iterable
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import sys
from typing import Literal, Optional
from fastapi import HTTPException
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.utils import CallTypesLiteral, Choices, ModelResponse
class _ENTERPRISE_LlamaGuard(CustomLogger):
# Class variables or attributes
def __init__(self, model_name: Optional[str] = None):
_model = model_name or litellm.llamaguard_model_name
if _model is None:
raise ValueError("model_name not set for LlamaGuard")
self.model = _model
file_path = litellm.llamaguard_unsafe_content_categories
data = None
if file_path is not None:
try:
with open(file_path, "r") as file:
data = file.read()
except FileNotFoundError:
raise Exception(f"File not found. file_path={file_path}")
except Exception as e:
raise Exception(f"An error occurred: {str(e)}, file_path={file_path}")
self.unsafe_content_categories = data
verbose_proxy_logger.debug(
f"self.unsafe_content_categories: {self.unsafe_content_categories}"
)
def print_verbose(self, print_statement):
try:
verbose_proxy_logger.debug(print_statement)
if litellm.set_verbose:
print(print_statement) # noqa
except Exception:
pass
def set_custom_prompt_template(self, messages: list):
if self.unsafe_content_categories is not None and self.model is not None:
role = "Agent" if len(messages) % 2 == 0 else "User"
litellm.register_prompt_template(
model=self.model,
initial_prompt_value=f"""[INST] Task: Check if there is unsafe content in '{role}' messages in conversations according our safety policy with the below categories.
<BEGIN UNSAFE CONTENT CATEGORIES>
{self.unsafe_content_categories}
<END UNSAFE CONTENT CATEGORIES>
<BEGIN CONVERSATION>
""",
roles={
"system": {
"pre_message": "User: ", # [OPTIONAL]
"post_message": "\n\n", # [OPTIONAL]
},
"user": {
"pre_message": "User: ", # [OPTIONAL]
"post_message": "\n\n", # [OPTIONAL]
},
"assistant": {
"pre_message": "Agent: ", # [OPTIONAL]
"post_message": "\n\n", # [OPTIONAL]
},
},
final_prompt_value="""<END CONVERSATION>
Provide your safety assessment for {role} in the above conversation:
- First line must read 'safe' or 'unsafe'.
- If unsafe, a second line must include a comma-separated list of violated categories. [/INST]""",
)
async def async_moderation_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: CallTypesLiteral,
):
"""
- Calls the Llama Guard Endpoint
- Rejects request if it fails safety check
The llama guard prompt template is applied automatically in factory.py
"""
if "messages" in data:
safety_check_messages = data["messages"][
-1
] # get the last response - llama guard has a 4k token limit
response = await litellm.acompletion(
model=self.model,
messages=[safety_check_messages],
hf_model_name="meta-llama/LlamaGuard-7b",
)
if (
isinstance(response, ModelResponse)
and isinstance(response.choices[0], Choices)
and response.choices[0].message.content is not None
and isinstance(response.choices[0].message.content, Iterable)
and "unsafe" in response.choices[0].message.content
):
raise HTTPException(
status_code=400, detail={"error": "Violated content safety policy"}
)
return data

View File

@ -1,174 +0,0 @@
# +------------------------+
#
# LLM Guard
# https://llm-guard.com/
#
# +------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
## This provides an LLM Guard Integration for content moderation on the proxy
from typing import Literal, Optional
import aiohttp
from fastapi import HTTPException
import litellm
from litellm._logging import verbose_proxy_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.proxy._types import UserAPIKeyAuth
from litellm.secret_managers.main import get_secret_str
from litellm.types.utils import CallTypesLiteral
from litellm.utils import get_formatted_prompt
class _ENTERPRISE_LLMGuard(CustomLogger):
# Class variables or attributes
def __init__(
self,
mock_testing: bool = False,
mock_redacted_text: Optional[dict] = None,
):
self.mock_redacted_text = mock_redacted_text
self.llm_guard_mode = litellm.llm_guard_mode
if mock_testing is True: # for testing purposes only
return
self.llm_guard_api_base = get_secret_str("LLM_GUARD_API_BASE", None)
if self.llm_guard_api_base is None:
raise Exception("Missing `LLM_GUARD_API_BASE` from environment")
elif not self.llm_guard_api_base.endswith("/"):
self.llm_guard_api_base += "/"
def print_verbose(self, print_statement):
try:
verbose_proxy_logger.debug(print_statement)
if litellm.set_verbose:
print(print_statement) # noqa
except Exception:
pass
async def moderation_check(self, text: str):
"""
[TODO] make this more performant for high-throughput scenario
"""
try:
async with aiohttp.ClientSession() as session:
if self.mock_redacted_text is not None:
redacted_text = self.mock_redacted_text
else:
# Make the first request to /analyze
analyze_url = f"{self.llm_guard_api_base}analyze/prompt"
verbose_proxy_logger.debug("Making request to: %s", analyze_url)
analyze_payload = {"prompt": text}
redacted_text = None
async with session.post(
analyze_url, json=analyze_payload
) as response:
redacted_text = await response.json()
verbose_proxy_logger.debug(
f"LLM Guard: Received response - {redacted_text}"
)
if redacted_text is not None:
if (
redacted_text.get("is_valid", None) is not None
and redacted_text["is_valid"] is False
):
raise HTTPException(
status_code=400,
detail={"error": "Violated content safety policy"},
)
else:
pass
else:
raise HTTPException(
status_code=500,
detail={
"error": f"Invalid content moderation response: {redacted_text}"
},
)
except Exception as e:
verbose_proxy_logger.exception(
"litellm.enterprise.enterprise_hooks.llm_guard::moderation_check - Exception occurred - {}".format(
str(e)
)
)
raise e
def should_proceed(self, user_api_key_dict: UserAPIKeyAuth, data: dict) -> bool:
if self.llm_guard_mode == "key-specific":
# check if llm guard enabled for specific keys only
self.print_verbose(
f"user_api_key_dict.permissions: {user_api_key_dict.permissions}"
)
if (
user_api_key_dict.permissions.get("enable_llm_guard_check", False)
is True
):
return True
elif self.llm_guard_mode == "all":
return True
elif self.llm_guard_mode == "request-specific":
self.print_verbose(f"received metadata: {data.get('metadata', {})}")
metadata = data.get("metadata", {})
permissions = metadata.get("permissions", {})
if (
"enable_llm_guard_check" in permissions
and permissions["enable_llm_guard_check"] is True
):
return True
return False
async def async_moderation_hook(
self,
data: dict,
user_api_key_dict: UserAPIKeyAuth,
call_type: CallTypesLiteral,
):
"""
- Calls the LLM Guard Endpoint
- Rejects request if it fails safety check
- Use the sanitized prompt returned
- LLM Guard can handle things like PII Masking, etc.
"""
self.print_verbose(
f"Inside LLM Guard Pre-Call Hook - llm_guard_mode={self.llm_guard_mode}"
)
_proceed = self.should_proceed(user_api_key_dict=user_api_key_dict, data=data)
if _proceed is False:
return
self.print_verbose("Makes LLM Guard Check")
try:
assert call_type in [
"completion",
"embeddings",
"image_generation",
"moderation",
"audio_transcription",
]
except Exception:
self.print_verbose(
f"Call Type - {call_type}, not in accepted list - ['completion','embeddings','image_generation','moderation','audio_transcription']"
)
return data
formatted_prompt = get_formatted_prompt(data=data, call_type=call_type) # type: ignore
self.print_verbose(f"LLM Guard, formatted_prompt: {formatted_prompt}")
return await self.moderation_check(text=formatted_prompt)
async def async_post_call_streaming_hook(
self, user_api_key_dict: UserAPIKeyAuth, response: str
):
if response is not None:
await self.moderation_check(text=response)
return response
# llm_guard = _ENTERPRISE_LLMGuard()
# asyncio.run(
# llm_guard.async_moderation_hook(
# data={"messages": [{"role": "user", "content": "Hey how's it going?"}]}
# )
# )

View File

@ -1,315 +0,0 @@
"""
PagerDuty Alerting Integration
Handles two types of alerts:
- High LLM API Failure Rate. Configure X fails in Y seconds to trigger an alert.
- High Number of Hanging LLM Requests. Configure X hangs in Y seconds to trigger an alert.
Note: This is a Free feature on the regular litellm docker image.
However, this is under the enterprise license
"""
import asyncio
import os
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Union
from litellm._logging import verbose_logger
from litellm.caching import DualCache
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.integrations.pagerduty import (
AlertingConfig,
PagerDutyInternalEvent,
PagerDutyPayload,
PagerDutyRequestBody,
)
from litellm.types.utils import (
CallTypesLiteral,
StandardLoggingPayload,
StandardLoggingPayloadErrorInformation,
)
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD = 60
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS = 60
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS = 60
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS = 600
class PagerDutyAlerting(SlackAlerting):
"""
Tracks failed requests and hanging requests separately.
If threshold is crossed for either type, triggers a PagerDuty alert.
"""
def __init__(
self, alerting_args: Optional[Union[AlertingConfig, dict]] = None, **kwargs
):
super().__init__()
_api_key = os.getenv("PAGERDUTY_API_KEY")
if not _api_key:
raise ValueError("PAGERDUTY_API_KEY is not set")
self.api_key: str = _api_key
alerting_args = alerting_args or {}
self.pagerduty_alerting_args: AlertingConfig = AlertingConfig(
failure_threshold=alerting_args.get(
"failure_threshold", PAGERDUTY_DEFAULT_FAILURE_THRESHOLD
),
failure_threshold_window_seconds=alerting_args.get(
"failure_threshold_window_seconds",
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS,
),
hanging_threshold_seconds=alerting_args.get(
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
),
hanging_threshold_window_seconds=alerting_args.get(
"hanging_threshold_window_seconds",
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
),
)
# Separate storage for failures vs. hangs
self._failure_events: List[PagerDutyInternalEvent] = []
self._hanging_events: List[PagerDutyInternalEvent] = []
# ------------------ MAIN LOGIC ------------------ #
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""
Record a failure event. Only send an alert to PagerDuty if the
configured *failure* threshold is exceeded in the specified window.
"""
now = datetime.now(timezone.utc)
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if not standard_logging_payload:
raise ValueError(
"standard_logging_object is required for PagerDutyAlerting"
)
# Extract error details
error_info: Optional[StandardLoggingPayloadErrorInformation] = (
standard_logging_payload.get("error_information") or {}
)
_meta = standard_logging_payload.get("metadata") or {}
self._failure_events.append(
PagerDutyInternalEvent(
failure_event_type="failed_response",
timestamp=now,
error_class=error_info.get("error_class"),
error_code=error_info.get("error_code"),
error_llm_provider=error_info.get("llm_provider"),
user_api_key_hash=_meta.get("user_api_key_hash"),
user_api_key_alias=_meta.get("user_api_key_alias"),
user_api_key_spend=_meta.get("user_api_key_spend"),
user_api_key_max_budget=_meta.get("user_api_key_max_budget"),
user_api_key_budget_reset_at=_meta.get("user_api_key_budget_reset_at"),
user_api_key_org_id=_meta.get("user_api_key_org_id"),
user_api_key_org_alias=_meta.get("user_api_key_org_alias"),
user_api_key_team_id=_meta.get("user_api_key_team_id"),
user_api_key_project_id=_meta.get("user_api_key_project_id"),
user_api_key_project_alias=_meta.get("user_api_key_project_alias"),
user_api_key_user_id=_meta.get("user_api_key_user_id"),
user_api_key_team_alias=_meta.get("user_api_key_team_alias"),
user_api_key_end_user_id=_meta.get("user_api_key_end_user_id"),
user_api_key_user_email=_meta.get("user_api_key_user_email"),
user_api_key_request_route=_meta.get("user_api_key_request_route"),
user_api_key_auth_metadata=_meta.get("user_api_key_auth_metadata"),
)
)
# Prune + Possibly alert
window_seconds = self.pagerduty_alerting_args.get(
"failure_threshold_window_seconds", 60
)
threshold = self.pagerduty_alerting_args.get("failure_threshold", 1)
# If threshold is crossed, send PD alert for failures
await self._send_alert_if_thresholds_crossed(
events=self._failure_events,
window_seconds=window_seconds,
threshold=threshold,
alert_prefix="High LLM API Failure Rate",
)
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: CallTypesLiteral,
) -> Optional[Union[Exception, str, dict]]:
"""
Example of detecting hanging requests by waiting a given threshold.
If the request didn't finish by then, we treat it as 'hanging'.
"""
verbose_logger.info("Inside Proxy Logging Pre-call hook!")
asyncio.create_task(
self.hanging_response_handler(
request_data=data, user_api_key_dict=user_api_key_dict
)
)
return None
async def hanging_response_handler(
self, request_data: Optional[dict], user_api_key_dict: UserAPIKeyAuth
):
"""
Checks if request completed by the time 'hanging_threshold_seconds' elapses.
If not, we classify it as a hanging request.
"""
verbose_logger.debug(
f"Inside Hanging Response Handler!..sleeping for {self.pagerduty_alerting_args.get('hanging_threshold_seconds', PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS)} seconds"
)
await asyncio.sleep(
self.pagerduty_alerting_args.get(
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
)
)
if await self._request_is_completed(request_data=request_data):
return # It's not hanging if completed
# Otherwise, record it as hanging
self._hanging_events.append(
PagerDutyInternalEvent(
failure_event_type="hanging_response",
timestamp=datetime.now(timezone.utc),
error_class="HangingRequest",
error_code="HangingRequest",
error_llm_provider="HangingRequest",
user_api_key_hash=user_api_key_dict.api_key,
user_api_key_alias=user_api_key_dict.key_alias,
user_api_key_spend=user_api_key_dict.spend,
user_api_key_max_budget=user_api_key_dict.max_budget,
user_api_key_budget_reset_at=(
user_api_key_dict.budget_reset_at.isoformat()
if user_api_key_dict.budget_reset_at
else None
),
user_api_key_org_id=user_api_key_dict.org_id,
user_api_key_org_alias=user_api_key_dict.organization_alias,
user_api_key_team_id=user_api_key_dict.team_id,
user_api_key_project_id=user_api_key_dict.project_id,
user_api_key_project_alias=user_api_key_dict.project_alias,
user_api_key_user_id=user_api_key_dict.user_id,
user_api_key_team_alias=user_api_key_dict.team_alias,
user_api_key_end_user_id=user_api_key_dict.end_user_id,
user_api_key_user_email=user_api_key_dict.user_email,
user_api_key_request_route=user_api_key_dict.request_route,
user_api_key_auth_metadata=user_api_key_dict.metadata,
)
)
# Prune + Possibly alert
window_seconds = self.pagerduty_alerting_args.get(
"hanging_threshold_window_seconds",
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
)
threshold: int = self.pagerduty_alerting_args.get(
"hanging_threshold_fails", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
)
# If threshold is crossed, send PD alert for hangs
await self._send_alert_if_thresholds_crossed(
events=self._hanging_events,
window_seconds=window_seconds,
threshold=threshold,
alert_prefix="High Number of Hanging LLM Requests",
)
# ------------------ HELPERS ------------------ #
async def _send_alert_if_thresholds_crossed(
self,
events: List[PagerDutyInternalEvent],
window_seconds: int,
threshold: int,
alert_prefix: str,
):
"""
1. Prune old events
2. If threshold is reached, build alert, send to PagerDuty
3. Clear those events
"""
cutoff = datetime.now(timezone.utc) - timedelta(seconds=window_seconds)
pruned = [e for e in events if e.get("timestamp", datetime.min) > cutoff]
# Update the reference list
events.clear()
events.extend(pruned)
# Check threshold
verbose_logger.debug(
f"Have {len(events)} events in the last {window_seconds} seconds. Threshold is {threshold}"
)
if len(events) >= threshold:
# Build short summary of last N events
error_summaries = self._build_error_summaries(events, max_errors=5)
alert_message = (
f"{alert_prefix}: {len(events)} in the last {window_seconds} seconds."
)
custom_details = {"recent_errors": error_summaries}
await self.send_alert_to_pagerduty(
alert_message=alert_message,
custom_details=custom_details,
)
# Clear them after sending an alert, so we don't spam
events.clear()
def _build_error_summaries(
self, events: List[PagerDutyInternalEvent], max_errors: int = 5
) -> List[PagerDutyInternalEvent]:
"""
Build short text summaries for the last `max_errors`.
Example: "ValueError (code: 500, provider: openai)"
"""
recent = events[-max_errors:]
summaries = []
for fe in recent:
# If any of these is None, show "N/A" to avoid messing up the summary string
fe.pop("timestamp")
summaries.append(fe)
return summaries
async def send_alert_to_pagerduty(self, alert_message: str, custom_details: dict):
"""
Send [critical] Alert to PagerDuty
https://developer.pagerduty.com/api-reference/YXBpOjI3NDgyNjU-pager-duty-v2-events-api
"""
try:
verbose_logger.debug(f"Sending alert to PagerDuty: {alert_message}")
async_client: AsyncHTTPHandler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
payload: PagerDutyRequestBody = PagerDutyRequestBody(
payload=PagerDutyPayload(
summary=alert_message,
severity="critical",
source="LiteLLM Alert",
component="LiteLLM",
custom_details=custom_details,
),
routing_key=self.api_key,
event_action="trigger",
)
return await async_client.post(
url="https://events.pagerduty.com/v2/enqueue",
json=dict(payload),
headers={"Content-Type": "application/json"},
)
except Exception as e:
verbose_logger.exception(f"Error sending alert to PagerDuty: {e}")

View File

@ -1,523 +0,0 @@
# +-------------------------------------------------------------+
#
# Use SecretDetection /moderations for your LLM calls
#
# +-------------------------------------------------------------+
# Thank you users! We ❤️ you! - Krrish & Ishaan
import os
import sys
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import tempfile
from typing import Optional
from litellm._logging import verbose_proxy_logger
from litellm.caching.caching import DualCache
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy.guardrails._content_utils import walk_user_text
GUARDRAIL_NAME = "hide_secrets"
_custom_plugins_path = "file://" + os.path.join(
os.path.dirname(os.path.abspath(__file__)), "secrets_plugins"
)
_default_detect_secrets_config = {
"plugins_used": [
{"name": "SoftlayerDetector"},
{"name": "StripeDetector"},
{"name": "NpmDetector"},
{"name": "IbmCosHmacDetector"},
{"name": "DiscordBotTokenDetector"},
{"name": "BasicAuthDetector"},
{"name": "AzureStorageKeyDetector"},
{"name": "ArtifactoryDetector"},
{"name": "AWSKeyDetector"},
{"name": "CloudantDetector"},
{"name": "IbmCloudIamDetector"},
{"name": "JwtTokenDetector"},
{"name": "MailchimpDetector"},
{"name": "SquareOAuthDetector"},
{"name": "PrivateKeyDetector"},
{"name": "TwilioKeyDetector"},
{
"name": "AdafruitKeyDetector",
"path": _custom_plugins_path + "/adafruit.py",
},
{
"name": "AdobeSecretDetector",
"path": _custom_plugins_path + "/adobe.py",
},
{
"name": "AgeSecretKeyDetector",
"path": _custom_plugins_path + "/age_secret_key.py",
},
{
"name": "AirtableApiKeyDetector",
"path": _custom_plugins_path + "/airtable_api_key.py",
},
{
"name": "AlgoliaApiKeyDetector",
"path": _custom_plugins_path + "/algolia_api_key.py",
},
{
"name": "AlibabaSecretDetector",
"path": _custom_plugins_path + "/alibaba.py",
},
{
"name": "AsanaSecretDetector",
"path": _custom_plugins_path + "/asana.py",
},
{
"name": "AtlassianApiTokenDetector",
"path": _custom_plugins_path + "/atlassian_api_token.py",
},
{
"name": "AuthressAccessKeyDetector",
"path": _custom_plugins_path + "/authress_access_key.py",
},
{
"name": "BittrexDetector",
"path": _custom_plugins_path + "/beamer_api_token.py",
},
{
"name": "BitbucketDetector",
"path": _custom_plugins_path + "/bitbucket.py",
},
{
"name": "BeamerApiTokenDetector",
"path": _custom_plugins_path + "/bittrex.py",
},
{
"name": "ClojarsApiTokenDetector",
"path": _custom_plugins_path + "/clojars_api_token.py",
},
{
"name": "CodecovAccessTokenDetector",
"path": _custom_plugins_path + "/codecov_access_token.py",
},
{
"name": "CoinbaseAccessTokenDetector",
"path": _custom_plugins_path + "/coinbase_access_token.py",
},
{
"name": "ConfluentDetector",
"path": _custom_plugins_path + "/confluent.py",
},
{
"name": "ContentfulApiTokenDetector",
"path": _custom_plugins_path + "/contentful_api_token.py",
},
{
"name": "DatabricksApiTokenDetector",
"path": _custom_plugins_path + "/databricks_api_token.py",
},
{
"name": "DatadogAccessTokenDetector",
"path": _custom_plugins_path + "/datadog_access_token.py",
},
{
"name": "DefinedNetworkingApiTokenDetector",
"path": _custom_plugins_path + "/defined_networking_api_token.py",
},
{
"name": "DigitaloceanDetector",
"path": _custom_plugins_path + "/digitalocean.py",
},
{
"name": "DopplerApiTokenDetector",
"path": _custom_plugins_path + "/doppler_api_token.py",
},
{
"name": "DroneciAccessTokenDetector",
"path": _custom_plugins_path + "/droneci_access_token.py",
},
{
"name": "DuffelApiTokenDetector",
"path": _custom_plugins_path + "/duffel_api_token.py",
},
{
"name": "DynatraceApiTokenDetector",
"path": _custom_plugins_path + "/dynatrace_api_token.py",
},
{
"name": "DiscordDetector",
"path": _custom_plugins_path + "/discord.py",
},
{
"name": "DropboxDetector",
"path": _custom_plugins_path + "/dropbox.py",
},
{
"name": "EasyPostDetector",
"path": _custom_plugins_path + "/easypost.py",
},
{
"name": "EtsyAccessTokenDetector",
"path": _custom_plugins_path + "/etsy_access_token.py",
},
{
"name": "FacebookAccessTokenDetector",
"path": _custom_plugins_path + "/facebook_access_token.py",
},
{
"name": "FastlyApiKeyDetector",
"path": _custom_plugins_path + "/fastly_api_token.py",
},
{
"name": "FinicityDetector",
"path": _custom_plugins_path + "/finicity.py",
},
{
"name": "FinnhubAccessTokenDetector",
"path": _custom_plugins_path + "/finnhub_access_token.py",
},
{
"name": "FlickrAccessTokenDetector",
"path": _custom_plugins_path + "/flickr_access_token.py",
},
{
"name": "FlutterwaveDetector",
"path": _custom_plugins_path + "/flutterwave.py",
},
{
"name": "FrameIoApiTokenDetector",
"path": _custom_plugins_path + "/frameio_api_token.py",
},
{
"name": "FreshbooksAccessTokenDetector",
"path": _custom_plugins_path + "/freshbooks_access_token.py",
},
{
"name": "GCPApiKeyDetector",
"path": _custom_plugins_path + "/gcp_api_key.py",
},
{
"name": "GitHubTokenCustomDetector",
"path": _custom_plugins_path + "/github_token.py",
},
{
"name": "GitLabDetector",
"path": _custom_plugins_path + "/gitlab.py",
},
{
"name": "GitterAccessTokenDetector",
"path": _custom_plugins_path + "/gitter_access_token.py",
},
{
"name": "GoCardlessApiTokenDetector",
"path": _custom_plugins_path + "/gocardless_api_token.py",
},
{
"name": "GrafanaDetector",
"path": _custom_plugins_path + "/grafana.py",
},
{
"name": "HashiCorpTFApiTokenDetector",
"path": _custom_plugins_path + "/hashicorp_tf_api_token.py",
},
{
"name": "HerokuApiKeyDetector",
"path": _custom_plugins_path + "/heroku_api_key.py",
},
{
"name": "HubSpotApiTokenDetector",
"path": _custom_plugins_path + "/hubspot_api_key.py",
},
{
"name": "HuggingFaceDetector",
"path": _custom_plugins_path + "/huggingface.py",
},
{
"name": "IntercomApiTokenDetector",
"path": _custom_plugins_path + "/intercom_api_key.py",
},
{
"name": "JFrogDetector",
"path": _custom_plugins_path + "/jfrog.py",
},
{
"name": "JWTBase64Detector",
"path": _custom_plugins_path + "/jwt.py",
},
{
"name": "KrakenAccessTokenDetector",
"path": _custom_plugins_path + "/kraken_access_token.py",
},
{
"name": "KucoinDetector",
"path": _custom_plugins_path + "/kucoin.py",
},
{
"name": "LaunchdarklyAccessTokenDetector",
"path": _custom_plugins_path + "/launchdarkly_access_token.py",
},
{
"name": "LinearDetector",
"path": _custom_plugins_path + "/linear.py",
},
{
"name": "LinkedInDetector",
"path": _custom_plugins_path + "/linkedin.py",
},
{
"name": "LobDetector",
"path": _custom_plugins_path + "/lob.py",
},
{
"name": "MailgunDetector",
"path": _custom_plugins_path + "/mailgun.py",
},
{
"name": "MapBoxApiTokenDetector",
"path": _custom_plugins_path + "/mapbox_api_token.py",
},
{
"name": "MattermostAccessTokenDetector",
"path": _custom_plugins_path + "/mattermost_access_token.py",
},
{
"name": "MessageBirdDetector",
"path": _custom_plugins_path + "/messagebird.py",
},
{
"name": "MicrosoftTeamsWebhookDetector",
"path": _custom_plugins_path + "/microsoft_teams_webhook.py",
},
{
"name": "NetlifyAccessTokenDetector",
"path": _custom_plugins_path + "/netlify_access_token.py",
},
{
"name": "NewRelicDetector",
"path": _custom_plugins_path + "/new_relic.py",
},
{
"name": "NYTimesAccessTokenDetector",
"path": _custom_plugins_path + "/nytimes_access_token.py",
},
{
"name": "OktaAccessTokenDetector",
"path": _custom_plugins_path + "/okta_access_token.py",
},
{
"name": "OpenAIApiKeyDetector",
"path": _custom_plugins_path + "/openai_api_key.py",
},
{
"name": "PlanetScaleDetector",
"path": _custom_plugins_path + "/planetscale.py",
},
{
"name": "PostmanApiTokenDetector",
"path": _custom_plugins_path + "/postman_api_token.py",
},
{
"name": "PrefectApiTokenDetector",
"path": _custom_plugins_path + "/prefect_api_token.py",
},
{
"name": "PulumiApiTokenDetector",
"path": _custom_plugins_path + "/pulumi_api_token.py",
},
{
"name": "PyPiUploadTokenDetector",
"path": _custom_plugins_path + "/pypi_upload_token.py",
},
{
"name": "RapidApiAccessTokenDetector",
"path": _custom_plugins_path + "/rapidapi_access_token.py",
},
{
"name": "ReadmeApiTokenDetector",
"path": _custom_plugins_path + "/readme_api_token.py",
},
{
"name": "RubygemsApiTokenDetector",
"path": _custom_plugins_path + "/rubygems_api_token.py",
},
{
"name": "ScalingoApiTokenDetector",
"path": _custom_plugins_path + "/scalingo_api_token.py",
},
{
"name": "SendbirdDetector",
"path": _custom_plugins_path + "/sendbird.py",
},
{
"name": "SendGridApiTokenDetector",
"path": _custom_plugins_path + "/sendgrid_api_token.py",
},
{
"name": "SendinBlueApiTokenDetector",
"path": _custom_plugins_path + "/sendinblue_api_token.py",
},
{
"name": "SentryAccessTokenDetector",
"path": _custom_plugins_path + "/sentry_access_token.py",
},
{
"name": "ShippoApiTokenDetector",
"path": _custom_plugins_path + "/shippo_api_token.py",
},
{
"name": "ShopifyDetector",
"path": _custom_plugins_path + "/shopify.py",
},
{
"name": "SlackDetector",
"path": _custom_plugins_path + "/slack.py",
},
{
"name": "SnykApiTokenDetector",
"path": _custom_plugins_path + "/snyk_api_token.py",
},
{
"name": "SquarespaceAccessTokenDetector",
"path": _custom_plugins_path + "/squarespace_access_token.py",
},
{
"name": "SumoLogicDetector",
"path": _custom_plugins_path + "/sumologic.py",
},
{
"name": "TelegramBotApiTokenDetector",
"path": _custom_plugins_path + "/telegram_bot_api_token.py",
},
{
"name": "TravisCiAccessTokenDetector",
"path": _custom_plugins_path + "/travisci_access_token.py",
},
{
"name": "TwitchApiTokenDetector",
"path": _custom_plugins_path + "/twitch_api_token.py",
},
{
"name": "TwitterDetector",
"path": _custom_plugins_path + "/twitter.py",
},
{
"name": "TypeformApiTokenDetector",
"path": _custom_plugins_path + "/typeform_api_token.py",
},
{
"name": "VaultDetector",
"path": _custom_plugins_path + "/vault.py",
},
{
"name": "YandexDetector",
"path": _custom_plugins_path + "/yandex.py",
},
{
"name": "ZendeskSecretKeyDetector",
"path": _custom_plugins_path + "/zendesk_secret_key.py",
},
{"name": "Base64HighEntropyString", "limit": 3.0},
{"name": "HexHighEntropyString", "limit": 3.0},
]
}
class _ENTERPRISE_SecretDetection(CustomGuardrail):
def __init__(self, detect_secrets_config: Optional[dict] = None, **kwargs):
self.user_defined_detect_secrets_config = detect_secrets_config
super().__init__(**kwargs)
def scan_message_for_secrets(self, message_content: str):
from detect_secrets import SecretsCollection
from detect_secrets.settings import transient_settings
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.write(message_content.encode("utf-8"))
temp_file.close()
secrets = SecretsCollection()
detect_secrets_config = (
self.user_defined_detect_secrets_config or _default_detect_secrets_config
)
with transient_settings(detect_secrets_config):
secrets.scan_file(temp_file.name)
os.remove(temp_file.name)
detected_secrets = []
for file in secrets.files:
for found_secret in secrets[file]:
if found_secret.secret_value is None:
continue
detected_secrets.append(
{"type": found_secret.type, "value": found_secret.secret_value}
)
return detected_secrets
async def should_run_check(self, user_api_key_dict: UserAPIKeyAuth) -> bool:
if user_api_key_dict.permissions is not None:
if GUARDRAIL_NAME in user_api_key_dict.permissions:
if user_api_key_dict.permissions[GUARDRAIL_NAME] is False:
return False
return True
#### CALL HOOKS - proxy only ####
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: str, # "completion", "embeddings", "image_generation", "moderation"
):
if await self.should_run_check(user_api_key_dict) is False:
return
# Covers multimodal list content + Responses-API input.
def _redact_message_text(text: str) -> str:
detected_secrets = self.scan_message_for_secrets(text)
for secret in detected_secrets:
text = text.replace(secret["value"], "[REDACTED]")
if detected_secrets:
secret_types = [secret["type"] for secret in detected_secrets]
verbose_proxy_logger.warning(
f"Detected and redacted secrets in message: {secret_types}"
)
return text
walk_user_text(data, _redact_message_text)
if "prompt" in data:
if isinstance(data["prompt"], str):
detected_secrets = self.scan_message_for_secrets(data["prompt"])
for secret in detected_secrets:
data["prompt"] = data["prompt"].replace(
secret["value"], "[REDACTED]"
)
if len(detected_secrets) > 0:
secret_types = [secret["type"] for secret in detected_secrets]
verbose_proxy_logger.warning(
f"Detected and redacted secrets in prompt: {secret_types}"
)
elif isinstance(data["prompt"], list):
# Index back into the list — assigning to ``item`` would only
# rebind the loop variable and leave ``data["prompt"]``
# carrying the unredacted secret.
for idx, item in enumerate(data["prompt"]):
if isinstance(item, str):
detected_secrets = self.scan_message_for_secrets(item)
for secret in detected_secrets:
item = item.replace(secret["value"], "[REDACTED]")
data["prompt"][idx] = item
if len(detected_secrets) > 0:
secret_types = [
secret["type"] for secret in detected_secrets
]
verbose_proxy_logger.warning(
f"Detected and redacted secrets in prompt: {secret_types}"
)
# ``data["input"]`` (Responses API and embeddings/moderation) is
# already covered by ``walk_user_text`` above.
return

View File

@ -1,23 +0,0 @@
"""
This plugin searches for Adafruit keys
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AdafruitKeyDetector(RegexBasedDetector):
"""Scans for Adafruit keys."""
@property
def secret_type(self) -> str:
return "Adafruit API Key"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(
r"""(?i)(?:adafruit)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9_-]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
)
]

View File

@ -1,26 +0,0 @@
"""
This plugin searches for Adobe keys
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AdobeSecretDetector(RegexBasedDetector):
"""Scans for Adobe client keys."""
@property
def secret_type(self) -> str:
return "Adobe Client Keys"
@property
def denylist(self) -> list[re.Pattern]:
return [
# Adobe Client ID (OAuth Web)
re.compile(
r"""(?i)(?:adobe)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-f0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# Adobe Client Secret
re.compile(r"(?i)\b((p8e-)[a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"),
]

View File

@ -1,21 +0,0 @@
"""
This plugin searches for Age secret keys
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AgeSecretKeyDetector(RegexBasedDetector):
"""Scans for Age secret keys."""
@property
def secret_type(self) -> str:
return "Age Secret Key"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(r"""AGE-SECRET-KEY-1[QPZRY9X8GF2TVDW0S3JN54KHCE6MUA7L]{58}"""),
]

View File

@ -1,23 +0,0 @@
"""
This plugin searches for Airtable API keys
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AirtableApiKeyDetector(RegexBasedDetector):
"""Scans for Airtable API keys."""
@property
def secret_type(self) -> str:
return "Airtable API Key"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(
r"""(?i)(?:airtable)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{17})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,21 +0,0 @@
"""
This plugin searches for Algolia API keys
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AlgoliaApiKeyDetector(RegexBasedDetector):
"""Scans for Algolia API keys."""
@property
def secret_type(self) -> str:
return "Algolia API Key"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(r"""(?i)\b((LTAI)[a-z0-9]{20})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
]

View File

@ -1,26 +0,0 @@
"""
This plugin searches for Alibaba secrets
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AlibabaSecretDetector(RegexBasedDetector):
"""Scans for Alibaba AccessKey IDs and Secret Keys."""
@property
def secret_type(self) -> str:
return "Alibaba Secrets"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Alibaba AccessKey ID
re.compile(r"""(?i)\b((LTAI)[a-z0-9]{20})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
# For Alibaba Secret Key
re.compile(
r"""(?i)(?:alibaba)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{30})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,28 +0,0 @@
"""
This plugin searches for Asana secrets
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AsanaSecretDetector(RegexBasedDetector):
"""Scans for Asana Client IDs and Client Secrets."""
@property
def secret_type(self) -> str:
return "Asana Secrets"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Asana Client ID
re.compile(
r"""(?i)(?:asana)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9]{16})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# For Asana Client Secret
re.compile(
r"""(?i)(?:asana)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,24 +0,0 @@
"""
This plugin searches for Atlassian API tokens
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AtlassianApiTokenDetector(RegexBasedDetector):
"""Scans for Atlassian API tokens."""
@property
def secret_type(self) -> str:
return "Atlassian API token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Atlassian API token
re.compile(
r"""(?i)(?:atlassian|confluence|jira)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{24})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,24 +0,0 @@
"""
This plugin searches for Authress Service Client Access Keys
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class AuthressAccessKeyDetector(RegexBasedDetector):
"""Scans for Authress Service Client Access Keys."""
@property
def secret_type(self) -> str:
return "Authress Service Client Access Key"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Authress Service Client Access Key
re.compile(
r"""(?i)\b((?:sc|ext|scauth|authress)_[a-z0-9]{5,30}\.[a-z0-9]{4,6}\.acc[_-][a-z0-9-]{10,32}\.[a-z0-9+/_=-]{30,120})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,24 +0,0 @@
"""
This plugin searches for Beamer API tokens
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class BeamerApiTokenDetector(RegexBasedDetector):
"""Scans for Beamer API tokens."""
@property
def secret_type(self) -> str:
return "Beamer API token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Beamer API token
re.compile(
r"""(?i)(?:beamer)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}(b_[a-z0-9=_\-]{44})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,28 +0,0 @@
"""
This plugin searches for Bitbucket Client ID and Client Secret
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class BitbucketDetector(RegexBasedDetector):
"""Scans for Bitbucket Client ID and Client Secret."""
@property
def secret_type(self) -> str:
return "Bitbucket Secrets"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Bitbucket Client ID
re.compile(
r"""(?i)(?:bitbucket)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# For Bitbucket Client Secret
re.compile(
r"""(?i)(?:bitbucket)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9=_\-]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,28 +0,0 @@
"""
This plugin searches for Bittrex Access Key and Secret Key
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class BittrexDetector(RegexBasedDetector):
"""Scans for Bittrex Access Key and Secret Key."""
@property
def secret_type(self) -> str:
return "Bittrex Secrets"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Bittrex Access Key
re.compile(
r"""(?i)(?:bittrex)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# For Bittrex Secret Key
re.compile(
r"""(?i)(?:bittrex)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,22 +0,0 @@
"""
This plugin searches for Clojars API tokens
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class ClojarsApiTokenDetector(RegexBasedDetector):
"""Scans for Clojars API tokens."""
@property
def secret_type(self) -> str:
return "Clojars API token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Clojars API token
re.compile(r"(?i)(CLOJARS_)[a-z0-9]{60}"),
]

View File

@ -1,24 +0,0 @@
"""
This plugin searches for Codecov Access Token
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class CodecovAccessTokenDetector(RegexBasedDetector):
"""Scans for Codecov Access Token."""
@property
def secret_type(self) -> str:
return "Codecov Access Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Codecov Access Token
re.compile(
r"""(?i)(?:codecov)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,24 +0,0 @@
"""
This plugin searches for Coinbase Access Token
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class CoinbaseAccessTokenDetector(RegexBasedDetector):
"""Scans for Coinbase Access Token."""
@property
def secret_type(self) -> str:
return "Coinbase Access Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Coinbase Access Token
re.compile(
r"""(?i)(?:coinbase)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9_-]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,28 +0,0 @@
"""
This plugin searches for Confluent Access Token and Confluent Secret Key
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class ConfluentDetector(RegexBasedDetector):
"""Scans for Confluent Access Token and Confluent Secret Key."""
@property
def secret_type(self) -> str:
return "Confluent Secret"
@property
def denylist(self) -> list[re.Pattern]:
return [
# For Confluent Access Token
re.compile(
r"""(?i)(?:confluent)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{16})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# For Confluent Secret Key
re.compile(
r"""(?i)(?:confluent)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,23 +0,0 @@
"""
This plugin searches for Contentful delivery API token.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class ContentfulApiTokenDetector(RegexBasedDetector):
"""Scans for Contentful delivery API token."""
@property
def secret_type(self) -> str:
return "Contentful API Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(
r"""(?i)(?:contentful)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9=_\-]{43})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,21 +0,0 @@
"""
This plugin searches for Databricks API token.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DatabricksApiTokenDetector(RegexBasedDetector):
"""Scans for Databricks API token."""
@property
def secret_type(self) -> str:
return "Databricks API Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(r"""(?i)\b(dapi[a-h0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
]

View File

@ -1,23 +0,0 @@
"""
This plugin searches for Datadog Access Tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DatadogAccessTokenDetector(RegexBasedDetector):
"""Scans for Datadog Access Tokens."""
@property
def secret_type(self) -> str:
return "Datadog Access Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(
r"""(?i)(?:datadog)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{40})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,23 +0,0 @@
"""
This plugin searches for Defined Networking API Tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DefinedNetworkingApiTokenDetector(RegexBasedDetector):
"""Scans for Defined Networking API Tokens."""
@property
def secret_type(self) -> str:
return "Defined Networking API Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
re.compile(
r"""(?i)(?:dnkey)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}(dnkey-[a-z0-9=_\-]{26}-[a-z0-9=_\-]{52})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,26 +0,0 @@
"""
This plugin searches for DigitalOcean tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DigitaloceanDetector(RegexBasedDetector):
"""Scans for various DigitalOcean Tokens."""
@property
def secret_type(self) -> str:
return "DigitalOcean Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# OAuth Access Token
re.compile(r"""(?i)\b(doo_v1_[a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
# Personal Access Token
re.compile(r"""(?i)\b(dop_v1_[a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
# OAuth Refresh Token
re.compile(r"""(?i)\b(dor_v1_[a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
]

View File

@ -1,32 +0,0 @@
"""
This plugin searches for Discord Client tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DiscordDetector(RegexBasedDetector):
"""Scans for various Discord Client Tokens."""
@property
def secret_type(self) -> str:
return "Discord Client Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# Discord API key
re.compile(
r"""(?i)(?:discord)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# Discord client ID
re.compile(
r"""(?i)(?:discord)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9]{18})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# Discord client secret
re.compile(
r"""(?i)(?:discord)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9=_\-]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,22 +0,0 @@
"""
This plugin searches for Doppler API tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DopplerApiTokenDetector(RegexBasedDetector):
"""Scans for Doppler API Tokens."""
@property
def secret_type(self) -> str:
return "Doppler API Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# Doppler API token
re.compile(r"""(?i)dp\.pt\.[a-z0-9]{43}"""),
]

View File

@ -1,24 +0,0 @@
"""
This plugin searches for Droneci Access Tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DroneciAccessTokenDetector(RegexBasedDetector):
"""Scans for Droneci Access Tokens."""
@property
def secret_type(self) -> str:
return "Droneci Access Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# Droneci Access Token
re.compile(
r"""(?i)(?:droneci)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,32 +0,0 @@
"""
This plugin searches for Dropbox tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DropboxDetector(RegexBasedDetector):
"""Scans for various Dropbox Tokens."""
@property
def secret_type(self) -> str:
return "Dropbox Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# Dropbox API secret
re.compile(
r"""(?i)(?:dropbox)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{15})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# Dropbox long-lived API token
re.compile(
r"""(?i)(?:dropbox)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{11}(AAAAAAAAAA)[a-z0-9\-_=]{43})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
# Dropbox short-lived API token
re.compile(
r"""(?i)(?:dropbox)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}(sl\.[a-z0-9\-=_]{135})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
),
]

View File

@ -1,22 +0,0 @@
"""
This plugin searches for Duffel API Tokens.
"""
import re
from detect_secrets.plugins.base import RegexBasedDetector
class DuffelApiTokenDetector(RegexBasedDetector):
"""Scans for Duffel API Tokens."""
@property
def secret_type(self) -> str:
return "Duffel API Token"
@property
def denylist(self) -> list[re.Pattern]:
return [
# Duffel API Token
re.compile(r"""(?i)duffel_(test|live)_[a-z0-9_\-=]{43}"""),
]

Some files were not shown because too many files have changed in this diff Show More