Compare commits
2 Commits
litellm_in
...
feat/mit-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79635badf7 | ||
|
|
7eaa4aeb3e |
@ -1,37 +0,0 @@
|
||||
|
||||
The BerriAI Enterprise license (the "Enterprise License")
|
||||
Copyright (c) 2024 - present Berrie AI Inc.
|
||||
|
||||
With regard to the BerriAI Software:
|
||||
|
||||
This software and associated documentation files (the "Software") may only be
|
||||
used in production, if you (and any entity that you represent) have agreed to,
|
||||
and are in compliance with, the BerriAI Subscription Terms of Service, available
|
||||
via [call](https://enterprise.litellm.ai/demo) or email (info@berri.ai) (the "Enterprise Terms"), or other
|
||||
agreement governing the use of the Software, as agreed by you and BerriAI,
|
||||
and otherwise have a valid BerriAI Enterprise license for the
|
||||
correct number of user seats. Subject to the foregoing sentence, you are free to
|
||||
modify this Software and publish patches to the Software. You agree that BerriAI
|
||||
and/or its licensors (as applicable) retain all right, title and interest in and
|
||||
to all such modifications and/or patches, and all such modifications and/or
|
||||
patches may only be used, copied, modified, displayed, distributed, or otherwise
|
||||
exploited with a valid BerriAI Enterprise license for the correct
|
||||
number of user seats. Notwithstanding the foregoing, you may copy and modify
|
||||
the Software for development and testing purposes, without requiring a
|
||||
subscription. You agree that BerriAI and/or its licensors (as applicable) retain
|
||||
all right, title and interest in and to all such modifications. You are not
|
||||
granted any other rights beyond what is expressly stated herein. Subject to the
|
||||
foregoing, it is forbidden to copy, merge, publish, distribute, sublicense,
|
||||
and/or sell the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
For all third party components incorporated into the BerriAI Software, those
|
||||
components are licensed under the original license provided by the owner of the
|
||||
applicable component.
|
||||
@ -1,9 +0,0 @@
|
||||
## LiteLLM Enterprise
|
||||
|
||||
Code in this folder is licensed under a commercial license. Please review the [LICENSE](./LICENSE.md) file within the /enterprise folder
|
||||
|
||||
**These features are covered under the LiteLLM Enterprise contract**
|
||||
|
||||
👉 **Using in an Enterprise / Need specific features ?** Meet with us [here](https://enterprise.litellm.ai/demo?month=2024-02)
|
||||
|
||||
See all Enterprise Features here 👉 [Docs](https://docs.litellm.ai/docs/proxy/enterprise)
|
||||
@ -1 +0,0 @@
|
||||
from . import *
|
||||
@ -1,44 +0,0 @@
|
||||
Resources:
|
||||
LiteLLMServer:
|
||||
Type: AWS::EC2::Instance
|
||||
Properties:
|
||||
AvailabilityZone: us-east-1a
|
||||
ImageId: ami-0f403e3180720dd7e
|
||||
InstanceType: t2.micro
|
||||
|
||||
LiteLLMServerAutoScalingGroup:
|
||||
Type: AWS::AutoScaling::AutoScalingGroup
|
||||
Properties:
|
||||
AvailabilityZones:
|
||||
- us-east-1a
|
||||
LaunchConfigurationName: !Ref LiteLLMServerLaunchConfig
|
||||
MinSize: 1
|
||||
MaxSize: 3
|
||||
DesiredCapacity: 1
|
||||
HealthCheckGracePeriod: 300
|
||||
|
||||
LiteLLMServerLaunchConfig:
|
||||
Type: AWS::AutoScaling::LaunchConfiguration
|
||||
Properties:
|
||||
ImageId: ami-0f403e3180720dd7e # Replace with your desired AMI ID
|
||||
InstanceType: t2.micro
|
||||
|
||||
LiteLLMServerScalingPolicy:
|
||||
Type: AWS::AutoScaling::ScalingPolicy
|
||||
Properties:
|
||||
AutoScalingGroupName: !Ref LiteLLMServerAutoScalingGroup
|
||||
PolicyType: TargetTrackingScaling
|
||||
TargetTrackingConfiguration:
|
||||
PredefinedMetricSpecification:
|
||||
PredefinedMetricType: ASGAverageCPUUtilization
|
||||
TargetValue: 60.0
|
||||
|
||||
LiteLLMDB:
|
||||
Type: AWS::RDS::DBInstance
|
||||
Properties:
|
||||
AllocatedStorage: 20
|
||||
Engine: postgres
|
||||
MasterUsername: litellmAdmin
|
||||
MasterUserPassword: litellmPassword
|
||||
DBInstanceClass: db.t3.micro
|
||||
AvailabilityZone: us-east-1a
|
||||
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.1.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.1.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.10.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.10.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.11.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.11.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.12.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.12.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.13.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.13.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.15.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.15.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.17.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.17.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.19.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.19.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.2.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.2.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.21.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.21.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.22.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.22.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.23.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.23.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.24.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.24.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.25.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.25.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.26.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.26.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.27.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.27.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.29.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.29.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.3.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.3.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.30.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.30.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.31.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.31.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.32.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.32.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.4.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.4.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.5.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.5.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.6.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.6.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.7.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.7.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.8.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.8.tar.gz
vendored
Binary file not shown.
Binary file not shown.
BIN
enterprise/dist/litellm_enterprise-0.1.9.tar.gz
vendored
BIN
enterprise/dist/litellm_enterprise-0.1.9.tar.gz
vendored
Binary file not shown.
@ -1,33 +0,0 @@
|
||||
from typing import Dict, Literal, Type, Union
|
||||
|
||||
from litellm_enterprise.proxy.hooks.managed_files import _PROXY_LiteLLMManagedFiles
|
||||
from litellm_enterprise.proxy.hooks.managed_vector_stores import (
|
||||
_PROXY_LiteLLMManagedVectorStores,
|
||||
)
|
||||
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
|
||||
ENTERPRISE_PROXY_HOOKS: Dict[str, Type[CustomLogger]] = {
|
||||
"managed_files": _PROXY_LiteLLMManagedFiles,
|
||||
"managed_vector_stores": _PROXY_LiteLLMManagedVectorStores,
|
||||
}
|
||||
|
||||
|
||||
def get_enterprise_proxy_hook(
|
||||
hook_name: Union[
|
||||
Literal[
|
||||
"managed_files",
|
||||
"managed_vector_stores",
|
||||
"max_parallel_requests",
|
||||
],
|
||||
str,
|
||||
],
|
||||
):
|
||||
"""
|
||||
Factory method to get a enterprise hook instance by name
|
||||
"""
|
||||
if hook_name not in ENTERPRISE_PROXY_HOOKS:
|
||||
raise ValueError(
|
||||
f"Unknown hook: {hook_name}. Available hooks: {list(ENTERPRISE_PROXY_HOOKS.keys())}"
|
||||
)
|
||||
return ENTERPRISE_PROXY_HOOKS[hook_name]
|
||||
@ -1,204 +0,0 @@
|
||||
# +-------------------------------------------------------------+
|
||||
#
|
||||
# Use AporiaAI for your LLM calls
|
||||
#
|
||||
# +-------------------------------------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from litellm.types.utils import CallTypesLiteral
|
||||
|
||||
sys.path.insert(
|
||||
0, os.path.abspath("../..")
|
||||
) # Adds the parent directory to the system path
|
||||
import json
|
||||
import sys
|
||||
from typing import Any, List, Literal, Optional
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.integrations.custom_guardrail import CustomGuardrail
|
||||
from litellm.litellm_core_utils.logging_utils import (
|
||||
convert_litellm_response_object_to_str,
|
||||
)
|
||||
from litellm.llms.custom_httpx.http_handler import (
|
||||
get_async_httpx_client,
|
||||
httpxSpecialProvider,
|
||||
)
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.proxy.guardrails.guardrail_helpers import should_proceed_based_on_metadata
|
||||
from litellm.types.guardrails import GuardrailEventHooks
|
||||
|
||||
GUARDRAIL_NAME = "aporia"
|
||||
|
||||
|
||||
class AporiaGuardrail(CustomGuardrail):
|
||||
def __init__(
|
||||
self, api_key: Optional[str] = None, api_base: Optional[str] = None, **kwargs
|
||||
):
|
||||
self.async_handler = get_async_httpx_client(
|
||||
llm_provider=httpxSpecialProvider.GuardrailCallback
|
||||
)
|
||||
self.aporia_api_key = api_key or os.environ["APORIO_API_KEY"]
|
||||
self.aporia_api_base = api_base or os.environ["APORIO_API_BASE"]
|
||||
super().__init__(**kwargs)
|
||||
|
||||
#### CALL HOOKS - proxy only ####
|
||||
def transform_messages(self, messages: List[dict]) -> List[dict]:
|
||||
supported_openai_roles = ["system", "user", "assistant"]
|
||||
default_role = "other" # for unsupported roles - e.g. tool
|
||||
new_messages = []
|
||||
for m in messages:
|
||||
if m.get("role", "") in supported_openai_roles:
|
||||
new_messages.append(m)
|
||||
else:
|
||||
new_messages.append(
|
||||
{
|
||||
"role": default_role,
|
||||
**{key: value for key, value in m.items() if key != "role"},
|
||||
}
|
||||
)
|
||||
|
||||
return new_messages
|
||||
|
||||
async def prepare_aporia_request(
|
||||
self, new_messages: List[dict], response_string: Optional[str] = None
|
||||
) -> dict:
|
||||
data: dict[str, Any] = {}
|
||||
if new_messages is not None:
|
||||
data["messages"] = new_messages
|
||||
if response_string is not None:
|
||||
data["response"] = response_string
|
||||
|
||||
# Set validation target
|
||||
if new_messages and response_string:
|
||||
data["validation_target"] = "both"
|
||||
elif new_messages:
|
||||
data["validation_target"] = "prompt"
|
||||
elif response_string:
|
||||
data["validation_target"] = "response"
|
||||
|
||||
verbose_proxy_logger.debug("Aporia AI request: %s", data)
|
||||
return data
|
||||
|
||||
async def make_aporia_api_request(
|
||||
self, new_messages: List[dict], response_string: Optional[str] = None
|
||||
):
|
||||
data = await self.prepare_aporia_request(
|
||||
new_messages=new_messages, response_string=response_string
|
||||
)
|
||||
|
||||
_json_data = json.dumps(data)
|
||||
|
||||
"""
|
||||
export APORIO_API_KEY=<your key>
|
||||
curl https://gr-prd-trial.aporia.com/some-id \
|
||||
-X POST \
|
||||
-H "X-APORIA-API-KEY: $APORIO_API_KEY" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "This is a test prompt"
|
||||
}
|
||||
],
|
||||
}
|
||||
'
|
||||
"""
|
||||
|
||||
response = await self.async_handler.post(
|
||||
url=self.aporia_api_base + "/validate",
|
||||
data=_json_data,
|
||||
headers={
|
||||
"X-APORIA-API-KEY": self.aporia_api_key,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
verbose_proxy_logger.debug("Aporia AI response: %s", response.text)
|
||||
if response.status_code == 200:
|
||||
# check if the response was flagged
|
||||
_json_response = response.json()
|
||||
action: str = _json_response.get(
|
||||
"action"
|
||||
) # possible values are modify, passthrough, block, rephrase
|
||||
if action == "block":
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": "Violated guardrail policy",
|
||||
"aporia_ai_response": _json_response,
|
||||
},
|
||||
)
|
||||
|
||||
async def async_post_call_success_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
response,
|
||||
):
|
||||
from litellm.proxy.common_utils.callback_utils import (
|
||||
add_guardrail_to_applied_guardrails_header,
|
||||
)
|
||||
|
||||
"""
|
||||
Use this for the post call moderation with Guardrails
|
||||
"""
|
||||
event_type: GuardrailEventHooks = GuardrailEventHooks.post_call
|
||||
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
|
||||
return
|
||||
|
||||
response_str: Optional[str] = convert_litellm_response_object_to_str(response)
|
||||
if response_str is not None:
|
||||
await self.make_aporia_api_request(
|
||||
response_string=response_str, new_messages=data.get("messages", [])
|
||||
)
|
||||
|
||||
add_guardrail_to_applied_guardrails_header(
|
||||
request_data=data, guardrail_name=self.guardrail_name
|
||||
)
|
||||
|
||||
pass
|
||||
|
||||
async def async_moderation_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
call_type: CallTypesLiteral,
|
||||
):
|
||||
from litellm.proxy.common_utils.callback_utils import (
|
||||
add_guardrail_to_applied_guardrails_header,
|
||||
)
|
||||
|
||||
event_type: GuardrailEventHooks = GuardrailEventHooks.during_call
|
||||
if self.should_run_guardrail(data=data, event_type=event_type) is not True:
|
||||
return
|
||||
|
||||
# old implementation - backwards compatibility
|
||||
if (
|
||||
await should_proceed_based_on_metadata(
|
||||
data=data,
|
||||
guardrail_name=GUARDRAIL_NAME,
|
||||
)
|
||||
is False
|
||||
):
|
||||
return
|
||||
|
||||
new_messages: Optional[List[dict]] = None
|
||||
if "messages" in data and isinstance(data["messages"], list):
|
||||
new_messages = self.transform_messages(messages=data["messages"])
|
||||
|
||||
if new_messages is not None:
|
||||
await self.make_aporia_api_request(new_messages=new_messages)
|
||||
add_guardrail_to_applied_guardrails_header(
|
||||
request_data=data, guardrail_name=self.guardrail_name
|
||||
)
|
||||
else:
|
||||
verbose_proxy_logger.warning(
|
||||
"Aporia AI: not running guardrail. No messages in data"
|
||||
)
|
||||
pass
|
||||
@ -1,115 +0,0 @@
|
||||
# +------------------------------+
|
||||
#
|
||||
# Banned Keywords
|
||||
#
|
||||
# +------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
## Reject a call / response if it contains certain keywords
|
||||
|
||||
|
||||
from typing import Literal
|
||||
import litellm
|
||||
from litellm.caching.caching import DualCache
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.proxy.guardrails._content_utils import (
|
||||
is_text_content_call_type,
|
||||
iter_message_text,
|
||||
)
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from fastapi import HTTPException
|
||||
|
||||
|
||||
class _ENTERPRISE_BannedKeywords(CustomLogger):
|
||||
# Class variables or attributes
|
||||
def __init__(self):
|
||||
banned_keywords_list = litellm.banned_keywords_list
|
||||
|
||||
if banned_keywords_list is None:
|
||||
raise Exception(
|
||||
"`banned_keywords_list` can either be a list or filepath. None set."
|
||||
)
|
||||
|
||||
if isinstance(banned_keywords_list, list):
|
||||
self.banned_keywords_list = banned_keywords_list
|
||||
|
||||
if isinstance(banned_keywords_list, str): # assume it's a filepath
|
||||
try:
|
||||
with open(banned_keywords_list, "r") as file:
|
||||
data = file.read()
|
||||
self.banned_keywords_list = data.split("\n")
|
||||
except FileNotFoundError:
|
||||
raise Exception(
|
||||
f"File not found. banned_keywords_list={banned_keywords_list}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
f"An error occurred: {str(e)}, banned_keywords_list={banned_keywords_list}"
|
||||
)
|
||||
|
||||
def print_verbose(self, print_statement, level: Literal["INFO", "DEBUG"] = "DEBUG"):
|
||||
if level == "INFO":
|
||||
verbose_proxy_logger.info(print_statement)
|
||||
elif level == "DEBUG":
|
||||
verbose_proxy_logger.debug(print_statement)
|
||||
|
||||
if litellm.set_verbose is True:
|
||||
print(print_statement) # noqa
|
||||
|
||||
def test_violation(self, test_str: str):
|
||||
for word in self.banned_keywords_list:
|
||||
if word in test_str.lower():
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={"error": f"Keyword banned. Keyword={word}"},
|
||||
)
|
||||
|
||||
async def async_pre_call_hook(
|
||||
self,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
cache: DualCache,
|
||||
data: dict,
|
||||
call_type: str, # "completion", "embeddings", "image_generation", "moderation"
|
||||
):
|
||||
try:
|
||||
"""
|
||||
- check if user id part of call
|
||||
- check if user id part of blocked list
|
||||
"""
|
||||
self.print_verbose("Inside Banned Keyword List Pre-Call Hook")
|
||||
if is_text_content_call_type(call_type):
|
||||
for text in iter_message_text(data):
|
||||
self.test_violation(test_str=text)
|
||||
|
||||
except HTTPException as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.exception(
|
||||
"litellm.enterprise.enterprise_hooks.banned_keywords::async_pre_call_hook - Exception occurred - {}".format(
|
||||
str(e)
|
||||
)
|
||||
)
|
||||
|
||||
async def async_post_call_success_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
response,
|
||||
):
|
||||
if not isinstance(response, litellm.ModelResponse):
|
||||
return
|
||||
|
||||
for choice in response.choices:
|
||||
if not isinstance(choice, litellm.utils.Choices):
|
||||
continue
|
||||
message = getattr(choice, "message", None)
|
||||
content = getattr(message, "content", None)
|
||||
if isinstance(content, str):
|
||||
self.test_violation(test_str=content)
|
||||
|
||||
async def async_post_call_streaming_hook(
|
||||
self,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
response: str,
|
||||
):
|
||||
self.test_violation(test_str=response)
|
||||
@ -1,124 +0,0 @@
|
||||
# +------------------------------+
|
||||
#
|
||||
# Blocked User List
|
||||
#
|
||||
# +------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
## This accepts a list of user id's for whom calls will be rejected
|
||||
|
||||
|
||||
from typing import Optional, Literal
|
||||
import litellm
|
||||
from litellm.proxy.utils import PrismaClient
|
||||
from litellm.caching.caching import DualCache
|
||||
from litellm.proxy._types import UserAPIKeyAuth, LiteLLM_EndUserTable
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from fastapi import HTTPException
|
||||
|
||||
|
||||
class _ENTERPRISE_BlockedUserList(CustomLogger):
|
||||
# Class variables or attributes
|
||||
def __init__(self, prisma_client: Optional[PrismaClient]):
|
||||
self.prisma_client = prisma_client
|
||||
|
||||
blocked_user_list = litellm.blocked_user_list
|
||||
if blocked_user_list is None:
|
||||
self.blocked_user_list = None
|
||||
return
|
||||
|
||||
if isinstance(blocked_user_list, list):
|
||||
self.blocked_user_list = blocked_user_list
|
||||
|
||||
if isinstance(blocked_user_list, str): # assume it's a filepath
|
||||
try:
|
||||
with open(blocked_user_list, "r") as file:
|
||||
data = file.read()
|
||||
self.blocked_user_list = data.split("\n")
|
||||
except FileNotFoundError:
|
||||
raise Exception(
|
||||
f"File not found. blocked_user_list={blocked_user_list}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
f"An error occurred: {str(e)}, blocked_user_list={blocked_user_list}"
|
||||
)
|
||||
|
||||
def print_verbose(self, print_statement, level: Literal["INFO", "DEBUG"] = "DEBUG"):
|
||||
if level == "INFO":
|
||||
verbose_proxy_logger.info(print_statement)
|
||||
elif level == "DEBUG":
|
||||
verbose_proxy_logger.debug(print_statement)
|
||||
|
||||
if litellm.set_verbose is True:
|
||||
print(print_statement) # noqa
|
||||
|
||||
async def async_pre_call_hook(
|
||||
self,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
cache: DualCache,
|
||||
data: dict,
|
||||
call_type: str,
|
||||
):
|
||||
try:
|
||||
"""
|
||||
- check if user id part of call
|
||||
- check if user id part of blocked list
|
||||
- if blocked list is none or user not in blocked list
|
||||
- check if end-user in cache
|
||||
- check if end-user in db
|
||||
"""
|
||||
self.print_verbose("Inside Blocked User List Pre-Call Hook")
|
||||
if "user_id" in data or "user" in data:
|
||||
user = data.get("user_id", data.get("user", ""))
|
||||
if (
|
||||
self.blocked_user_list is not None
|
||||
and user in self.blocked_user_list
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": f"User blocked from making LLM API Calls. User={user}"
|
||||
},
|
||||
)
|
||||
|
||||
cache_key = f"litellm:end_user_id:{user}"
|
||||
end_user_cache_obj: Optional[LiteLLM_EndUserTable] = cache.get_cache( # type: ignore
|
||||
key=cache_key
|
||||
)
|
||||
if end_user_cache_obj is None and self.prisma_client is not None:
|
||||
# check db
|
||||
end_user_obj = (
|
||||
await self.prisma_client.db.litellm_endusertable.find_unique(
|
||||
where={"user_id": user}
|
||||
)
|
||||
)
|
||||
if end_user_obj is None: # user not in db - assume not blocked
|
||||
end_user_obj = LiteLLM_EndUserTable(user_id=user, blocked=False)
|
||||
cache.set_cache(key=cache_key, value=end_user_obj, ttl=60)
|
||||
if end_user_obj is not None and end_user_obj.blocked is True:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": f"User blocked from making LLM API Calls. User={user}"
|
||||
},
|
||||
)
|
||||
elif (
|
||||
end_user_cache_obj is not None
|
||||
and end_user_cache_obj.blocked is True
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": f"User blocked from making LLM API Calls. User={user}"
|
||||
},
|
||||
)
|
||||
|
||||
except HTTPException as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.exception(
|
||||
"litellm.enterprise.enterprise_hooks.blocked_user_list::async_pre_call_hook - Exception occurred - {}".format(
|
||||
str(e)
|
||||
)
|
||||
)
|
||||
@ -1,135 +0,0 @@
|
||||
# +-----------------------------------------------+
|
||||
#
|
||||
# Google Text Moderation
|
||||
# https://cloud.google.com/natural-language/docs/moderating-text
|
||||
#
|
||||
# +-----------------------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.proxy.guardrails._content_utils import iter_message_text
|
||||
from litellm.types.utils import CallTypesLiteral
|
||||
|
||||
|
||||
class _ENTERPRISE_GoogleTextModeration(CustomLogger):
|
||||
user_api_key_cache = None
|
||||
confidence_categories = [
|
||||
"toxic",
|
||||
"insult",
|
||||
"profanity",
|
||||
"derogatory",
|
||||
"sexual",
|
||||
"death_harm_and_tragedy",
|
||||
"violent",
|
||||
"firearms_and_weapons",
|
||||
"public_safety",
|
||||
"health",
|
||||
"religion_and_belief",
|
||||
"illicit_drugs",
|
||||
"war_and_conflict",
|
||||
"politics",
|
||||
"finance",
|
||||
"legal",
|
||||
] # https://cloud.google.com/natural-language/docs/moderating-text#safety_attribute_confidence_scores
|
||||
|
||||
# Class variables or attributes
|
||||
def __init__(self):
|
||||
try:
|
||||
from google.cloud import language_v1 # type: ignore
|
||||
except Exception:
|
||||
raise Exception(
|
||||
"Missing google.cloud package. Run `pip install --upgrade google-cloud-language`"
|
||||
)
|
||||
|
||||
# Instantiates a client
|
||||
self.client = language_v1.LanguageServiceClient()
|
||||
self.moderate_text_request = language_v1.ModerateTextRequest
|
||||
self.language_document = language_v1.types.Document # type: ignore
|
||||
self.document_type = language_v1.types.Document.Type.PLAIN_TEXT # type: ignore
|
||||
|
||||
default_confidence_threshold = (
|
||||
litellm.google_moderation_confidence_threshold or 0.8
|
||||
) # by default require a high confidence (80%) to fail
|
||||
|
||||
for category in self.confidence_categories:
|
||||
if hasattr(litellm, f"{category}_confidence_threshold"):
|
||||
setattr(
|
||||
self,
|
||||
f"{category}_confidence_threshold",
|
||||
getattr(litellm, f"{category}_confidence_threshold"),
|
||||
)
|
||||
else:
|
||||
setattr(
|
||||
self,
|
||||
f"{category}_confidence_threshold",
|
||||
default_confidence_threshold,
|
||||
)
|
||||
set_confidence_value = getattr(
|
||||
self,
|
||||
f"{category}_confidence_threshold",
|
||||
)
|
||||
verbose_proxy_logger.info(
|
||||
f"Google Text Moderation: {category}_confidence_threshold: {set_confidence_value}"
|
||||
)
|
||||
|
||||
def print_verbose(self, print_statement):
|
||||
try:
|
||||
verbose_proxy_logger.debug(print_statement)
|
||||
if litellm.set_verbose:
|
||||
print(print_statement) # noqa
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def async_moderation_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
call_type: CallTypesLiteral,
|
||||
):
|
||||
"""
|
||||
- Calls Google's Text Moderation API
|
||||
- Rejects request if it fails safety check
|
||||
"""
|
||||
# Covers multimodal list content + Responses-API input.
|
||||
text = "".join(iter_message_text(data))
|
||||
if text:
|
||||
document = self.language_document(content=text, type_=self.document_type)
|
||||
|
||||
request = self.moderate_text_request(
|
||||
document=document,
|
||||
)
|
||||
|
||||
# Make the request
|
||||
response = self.client.moderate_text(request=request)
|
||||
for category in response.moderation_categories:
|
||||
category_name = category.name
|
||||
category_name = category_name.lower()
|
||||
category_name = category_name.replace("&", "and")
|
||||
category_name = category_name.replace(",", "")
|
||||
category_name = category_name.replace(
|
||||
" ", "_"
|
||||
) # e.g. go from 'Firearms & Weapons' to 'firearms_and_weapons'
|
||||
if category.confidence > getattr(
|
||||
self, f"{category_name}_confidence_threshold"
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={
|
||||
"error": f"Violated content safety policy. Category={category}"
|
||||
},
|
||||
)
|
||||
# Handle the response
|
||||
return data
|
||||
|
||||
|
||||
# google_text_moderation_obj = _ENTERPRISE_GoogleTextModeration()
|
||||
# asyncio.run(
|
||||
# google_text_moderation_obj.async_moderation_hook(
|
||||
# data={"messages": [{"role": "user", "content": "Hey, how's it going?"}]}
|
||||
# )
|
||||
# )
|
||||
@ -1,58 +0,0 @@
|
||||
# +-------------------------------------------------------------+
|
||||
#
|
||||
# Use OpenAI /moderations for your LLM calls
|
||||
#
|
||||
# +-------------------------------------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(
|
||||
0, os.path.abspath("../..")
|
||||
) # Adds the parent directory to the system path
|
||||
import sys
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.proxy.guardrails._content_utils import iter_message_text
|
||||
from litellm.types.utils import CallTypesLiteral
|
||||
|
||||
|
||||
class _ENTERPRISE_OpenAI_Moderation(CustomLogger):
|
||||
def __init__(self):
|
||||
self.model_name = (
|
||||
litellm.openai_moderations_model_name or "text-moderation-latest"
|
||||
) # pass the model_name you initialized on litellm.Router()
|
||||
pass
|
||||
|
||||
#### CALL HOOKS - proxy only ####
|
||||
|
||||
async def async_moderation_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
call_type: CallTypesLiteral,
|
||||
):
|
||||
# Covers multimodal list content + Responses-API input.
|
||||
text = "".join(iter_message_text(data))
|
||||
|
||||
from litellm.proxy.proxy_server import llm_router
|
||||
|
||||
if llm_router is None:
|
||||
return
|
||||
|
||||
moderation_response = await llm_router.amoderation(
|
||||
model=self.model_name, input=text
|
||||
)
|
||||
|
||||
verbose_proxy_logger.debug("Moderation response: %s", moderation_response)
|
||||
if moderation_response and moderation_response.results[0].flagged is True:
|
||||
raise HTTPException(
|
||||
status_code=403, detail={"error": "Violated content safety policy"}
|
||||
)
|
||||
pass
|
||||
@ -1,6 +0,0 @@
|
||||
## Admin UI
|
||||
|
||||
Customize the Admin UI to your companies branding / logo
|
||||

|
||||
|
||||
## Docs to set up Custom Admin UI [here](https://docs.litellm.ai/docs/proxy/ui)
|
||||
@ -1,11 +0,0 @@
|
||||
{
|
||||
"brand": {
|
||||
"DEFAULT": "teal",
|
||||
"faint": "teal",
|
||||
"muted": "teal",
|
||||
"subtle": "teal",
|
||||
"emphasis": "teal",
|
||||
"inverted": "teal"
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,99 +0,0 @@
|
||||
from typing import List, Optional
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_logger
|
||||
from litellm.constants import X_LITELLM_DISABLE_CALLBACKS
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.litellm_core_utils.llm_request_utils import (
|
||||
get_proxy_server_request_headers,
|
||||
)
|
||||
from litellm.proxy._types import CommonProxyErrors
|
||||
from litellm.types.utils import StandardCallbackDynamicParams
|
||||
|
||||
|
||||
class EnterpriseCallbackControls:
|
||||
@staticmethod
|
||||
def is_callback_disabled_dynamically(
|
||||
callback: litellm.CALLBACK_TYPES,
|
||||
litellm_params: dict,
|
||||
standard_callback_dynamic_params: StandardCallbackDynamicParams
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a callback is disabled via the x-litellm-disable-callbacks header or via `litellm_disabled_callbacks` in standard_callback_dynamic_params.
|
||||
|
||||
Args:
|
||||
callback: The callback to check (can be string, CustomLogger instance, or callable)
|
||||
litellm_params: Parameters containing proxy server request info
|
||||
|
||||
Returns:
|
||||
bool: True if the callback should be disabled, False otherwise
|
||||
"""
|
||||
from litellm.litellm_core_utils.custom_logger_registry import (
|
||||
CustomLoggerRegistry,
|
||||
)
|
||||
|
||||
try:
|
||||
disabled_callbacks = EnterpriseCallbackControls.get_disabled_callbacks(litellm_params, standard_callback_dynamic_params)
|
||||
verbose_logger.debug(f"Dynamically disabled callbacks from {X_LITELLM_DISABLE_CALLBACKS}: {disabled_callbacks}")
|
||||
verbose_logger.debug(f"Checking if {callback} is disabled via headers. Disable callbacks from headers: {disabled_callbacks}")
|
||||
if disabled_callbacks is not None:
|
||||
#########################################################
|
||||
# premium user check
|
||||
#########################################################
|
||||
if not EnterpriseCallbackControls._should_allow_dynamic_callback_disabling():
|
||||
return False
|
||||
#########################################################
|
||||
if isinstance(callback, str):
|
||||
if callback.lower() in disabled_callbacks:
|
||||
verbose_logger.debug(f"Not logging to {callback} because it is disabled via {X_LITELLM_DISABLE_CALLBACKS}")
|
||||
return True
|
||||
elif isinstance(callback, CustomLogger):
|
||||
# get the string name of the callback
|
||||
callback_str = CustomLoggerRegistry.get_callback_str_from_class_type(callback.__class__)
|
||||
if callback_str is not None and callback_str.lower() in disabled_callbacks:
|
||||
verbose_logger.debug(f"Not logging to {callback_str} because it is disabled via {X_LITELLM_DISABLE_CALLBACKS}")
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
verbose_logger.debug(
|
||||
f"Error checking disabled callbacks header: {str(e)}"
|
||||
)
|
||||
return False
|
||||
@staticmethod
|
||||
def get_disabled_callbacks(litellm_params: dict, standard_callback_dynamic_params: StandardCallbackDynamicParams) -> Optional[List[str]]:
|
||||
"""
|
||||
Get the disabled callbacks from the standard callback dynamic params.
|
||||
"""
|
||||
|
||||
#########################################################
|
||||
# check if disabled via headers
|
||||
#########################################################
|
||||
request_headers = get_proxy_server_request_headers(litellm_params)
|
||||
disabled_callbacks = request_headers.get(X_LITELLM_DISABLE_CALLBACKS, None)
|
||||
if disabled_callbacks is not None:
|
||||
disabled_callbacks = set([cb.strip().lower() for cb in disabled_callbacks.split(",")])
|
||||
return list(disabled_callbacks)
|
||||
|
||||
|
||||
#########################################################
|
||||
# check if disabled via request body
|
||||
#########################################################
|
||||
if standard_callback_dynamic_params.get("litellm_disabled_callbacks", None) is not None:
|
||||
return standard_callback_dynamic_params.get("litellm_disabled_callbacks", None)
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _should_allow_dynamic_callback_disabling():
|
||||
import litellm
|
||||
from litellm.proxy.proxy_server import premium_user
|
||||
|
||||
# Check if admin has disabled this feature
|
||||
if litellm.allow_dynamic_callback_disabling is not True:
|
||||
verbose_logger.debug("Dynamic callback disabling is disabled by admin via litellm.allow_dynamic_callback_disabling")
|
||||
return False
|
||||
|
||||
if premium_user:
|
||||
return True
|
||||
verbose_logger.warning(f"Disabling callbacks using request headers is an enterprise feature. {CommonProxyErrors.not_premium_user.value}")
|
||||
return False
|
||||
@ -1,27 +0,0 @@
|
||||
# this is an example endpoint to receive data from litellm
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
@app.post("/log-event")
|
||||
async def log_event(request: Request):
|
||||
try:
|
||||
print("Received /log-event request") # noqa
|
||||
# Assuming the incoming request has JSON data
|
||||
data = await request.json()
|
||||
print("Received request data:") # noqa
|
||||
print(data) # noqa
|
||||
|
||||
# Your additional logic can go here
|
||||
# For now, just printing the received data
|
||||
|
||||
return {"message": "Request received successfully"}
|
||||
except Exception:
|
||||
raise HTTPException(status_code=500, detail="Internal Server Error")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
|
||||
uvicorn.run(app, host="127.0.0.1", port=8000)
|
||||
@ -1,130 +0,0 @@
|
||||
# +-------------------------------------------------------------+
|
||||
#
|
||||
# Llama Guard
|
||||
# https://huggingface.co/meta-llama/LlamaGuard-7b/tree/main
|
||||
#
|
||||
# LLM for Content Moderation
|
||||
# +-------------------------------------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
|
||||
import os
|
||||
import sys
|
||||
from collections.abc import Iterable
|
||||
|
||||
sys.path.insert(
|
||||
0, os.path.abspath("../..")
|
||||
) # Adds the parent directory to the system path
|
||||
import sys
|
||||
from typing import Literal, Optional
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.types.utils import CallTypesLiteral, Choices, ModelResponse
|
||||
|
||||
|
||||
class _ENTERPRISE_LlamaGuard(CustomLogger):
|
||||
# Class variables or attributes
|
||||
def __init__(self, model_name: Optional[str] = None):
|
||||
_model = model_name or litellm.llamaguard_model_name
|
||||
if _model is None:
|
||||
raise ValueError("model_name not set for LlamaGuard")
|
||||
self.model = _model
|
||||
file_path = litellm.llamaguard_unsafe_content_categories
|
||||
data = None
|
||||
|
||||
if file_path is not None:
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
data = file.read()
|
||||
except FileNotFoundError:
|
||||
raise Exception(f"File not found. file_path={file_path}")
|
||||
except Exception as e:
|
||||
raise Exception(f"An error occurred: {str(e)}, file_path={file_path}")
|
||||
|
||||
self.unsafe_content_categories = data
|
||||
|
||||
verbose_proxy_logger.debug(
|
||||
f"self.unsafe_content_categories: {self.unsafe_content_categories}"
|
||||
)
|
||||
|
||||
def print_verbose(self, print_statement):
|
||||
try:
|
||||
verbose_proxy_logger.debug(print_statement)
|
||||
if litellm.set_verbose:
|
||||
print(print_statement) # noqa
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def set_custom_prompt_template(self, messages: list):
|
||||
if self.unsafe_content_categories is not None and self.model is not None:
|
||||
role = "Agent" if len(messages) % 2 == 0 else "User"
|
||||
litellm.register_prompt_template(
|
||||
model=self.model,
|
||||
initial_prompt_value=f"""[INST] Task: Check if there is unsafe content in '{role}' messages in conversations according our safety policy with the below categories.
|
||||
|
||||
<BEGIN UNSAFE CONTENT CATEGORIES>
|
||||
{self.unsafe_content_categories}
|
||||
<END UNSAFE CONTENT CATEGORIES>
|
||||
|
||||
<BEGIN CONVERSATION>
|
||||
|
||||
""",
|
||||
roles={
|
||||
"system": {
|
||||
"pre_message": "User: ", # [OPTIONAL]
|
||||
"post_message": "\n\n", # [OPTIONAL]
|
||||
},
|
||||
"user": {
|
||||
"pre_message": "User: ", # [OPTIONAL]
|
||||
"post_message": "\n\n", # [OPTIONAL]
|
||||
},
|
||||
"assistant": {
|
||||
"pre_message": "Agent: ", # [OPTIONAL]
|
||||
"post_message": "\n\n", # [OPTIONAL]
|
||||
},
|
||||
},
|
||||
final_prompt_value="""<END CONVERSATION>
|
||||
|
||||
Provide your safety assessment for {role} in the above conversation:
|
||||
- First line must read 'safe' or 'unsafe'.
|
||||
- If unsafe, a second line must include a comma-separated list of violated categories. [/INST]""",
|
||||
)
|
||||
|
||||
async def async_moderation_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
call_type: CallTypesLiteral,
|
||||
):
|
||||
"""
|
||||
- Calls the Llama Guard Endpoint
|
||||
- Rejects request if it fails safety check
|
||||
|
||||
The llama guard prompt template is applied automatically in factory.py
|
||||
"""
|
||||
if "messages" in data:
|
||||
safety_check_messages = data["messages"][
|
||||
-1
|
||||
] # get the last response - llama guard has a 4k token limit
|
||||
response = await litellm.acompletion(
|
||||
model=self.model,
|
||||
messages=[safety_check_messages],
|
||||
hf_model_name="meta-llama/LlamaGuard-7b",
|
||||
)
|
||||
|
||||
if (
|
||||
isinstance(response, ModelResponse)
|
||||
and isinstance(response.choices[0], Choices)
|
||||
and response.choices[0].message.content is not None
|
||||
and isinstance(response.choices[0].message.content, Iterable)
|
||||
and "unsafe" in response.choices[0].message.content
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=400, detail={"error": "Violated content safety policy"}
|
||||
)
|
||||
|
||||
return data
|
||||
@ -1,174 +0,0 @@
|
||||
# +------------------------+
|
||||
#
|
||||
# LLM Guard
|
||||
# https://llm-guard.com/
|
||||
#
|
||||
# +------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
## This provides an LLM Guard Integration for content moderation on the proxy
|
||||
|
||||
from typing import Literal, Optional
|
||||
|
||||
import aiohttp
|
||||
from fastapi import HTTPException
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.secret_managers.main import get_secret_str
|
||||
from litellm.types.utils import CallTypesLiteral
|
||||
from litellm.utils import get_formatted_prompt
|
||||
|
||||
|
||||
class _ENTERPRISE_LLMGuard(CustomLogger):
|
||||
# Class variables or attributes
|
||||
def __init__(
|
||||
self,
|
||||
mock_testing: bool = False,
|
||||
mock_redacted_text: Optional[dict] = None,
|
||||
):
|
||||
self.mock_redacted_text = mock_redacted_text
|
||||
self.llm_guard_mode = litellm.llm_guard_mode
|
||||
if mock_testing is True: # for testing purposes only
|
||||
return
|
||||
self.llm_guard_api_base = get_secret_str("LLM_GUARD_API_BASE", None)
|
||||
if self.llm_guard_api_base is None:
|
||||
raise Exception("Missing `LLM_GUARD_API_BASE` from environment")
|
||||
elif not self.llm_guard_api_base.endswith("/"):
|
||||
self.llm_guard_api_base += "/"
|
||||
|
||||
def print_verbose(self, print_statement):
|
||||
try:
|
||||
verbose_proxy_logger.debug(print_statement)
|
||||
if litellm.set_verbose:
|
||||
print(print_statement) # noqa
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def moderation_check(self, text: str):
|
||||
"""
|
||||
[TODO] make this more performant for high-throughput scenario
|
||||
"""
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
if self.mock_redacted_text is not None:
|
||||
redacted_text = self.mock_redacted_text
|
||||
else:
|
||||
# Make the first request to /analyze
|
||||
analyze_url = f"{self.llm_guard_api_base}analyze/prompt"
|
||||
verbose_proxy_logger.debug("Making request to: %s", analyze_url)
|
||||
analyze_payload = {"prompt": text}
|
||||
redacted_text = None
|
||||
async with session.post(
|
||||
analyze_url, json=analyze_payload
|
||||
) as response:
|
||||
redacted_text = await response.json()
|
||||
verbose_proxy_logger.debug(
|
||||
f"LLM Guard: Received response - {redacted_text}"
|
||||
)
|
||||
if redacted_text is not None:
|
||||
if (
|
||||
redacted_text.get("is_valid", None) is not None
|
||||
and redacted_text["is_valid"] is False
|
||||
):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail={"error": "Violated content safety policy"},
|
||||
)
|
||||
else:
|
||||
pass
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail={
|
||||
"error": f"Invalid content moderation response: {redacted_text}"
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.exception(
|
||||
"litellm.enterprise.enterprise_hooks.llm_guard::moderation_check - Exception occurred - {}".format(
|
||||
str(e)
|
||||
)
|
||||
)
|
||||
raise e
|
||||
|
||||
def should_proceed(self, user_api_key_dict: UserAPIKeyAuth, data: dict) -> bool:
|
||||
if self.llm_guard_mode == "key-specific":
|
||||
# check if llm guard enabled for specific keys only
|
||||
self.print_verbose(
|
||||
f"user_api_key_dict.permissions: {user_api_key_dict.permissions}"
|
||||
)
|
||||
if (
|
||||
user_api_key_dict.permissions.get("enable_llm_guard_check", False)
|
||||
is True
|
||||
):
|
||||
return True
|
||||
elif self.llm_guard_mode == "all":
|
||||
return True
|
||||
elif self.llm_guard_mode == "request-specific":
|
||||
self.print_verbose(f"received metadata: {data.get('metadata', {})}")
|
||||
metadata = data.get("metadata", {})
|
||||
permissions = metadata.get("permissions", {})
|
||||
if (
|
||||
"enable_llm_guard_check" in permissions
|
||||
and permissions["enable_llm_guard_check"] is True
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
async def async_moderation_hook(
|
||||
self,
|
||||
data: dict,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
call_type: CallTypesLiteral,
|
||||
):
|
||||
"""
|
||||
- Calls the LLM Guard Endpoint
|
||||
- Rejects request if it fails safety check
|
||||
- Use the sanitized prompt returned
|
||||
- LLM Guard can handle things like PII Masking, etc.
|
||||
"""
|
||||
self.print_verbose(
|
||||
f"Inside LLM Guard Pre-Call Hook - llm_guard_mode={self.llm_guard_mode}"
|
||||
)
|
||||
|
||||
_proceed = self.should_proceed(user_api_key_dict=user_api_key_dict, data=data)
|
||||
if _proceed is False:
|
||||
return
|
||||
|
||||
self.print_verbose("Makes LLM Guard Check")
|
||||
try:
|
||||
assert call_type in [
|
||||
"completion",
|
||||
"embeddings",
|
||||
"image_generation",
|
||||
"moderation",
|
||||
"audio_transcription",
|
||||
]
|
||||
except Exception:
|
||||
self.print_verbose(
|
||||
f"Call Type - {call_type}, not in accepted list - ['completion','embeddings','image_generation','moderation','audio_transcription']"
|
||||
)
|
||||
return data
|
||||
|
||||
formatted_prompt = get_formatted_prompt(data=data, call_type=call_type) # type: ignore
|
||||
self.print_verbose(f"LLM Guard, formatted_prompt: {formatted_prompt}")
|
||||
return await self.moderation_check(text=formatted_prompt)
|
||||
|
||||
async def async_post_call_streaming_hook(
|
||||
self, user_api_key_dict: UserAPIKeyAuth, response: str
|
||||
):
|
||||
if response is not None:
|
||||
await self.moderation_check(text=response)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
# llm_guard = _ENTERPRISE_LLMGuard()
|
||||
|
||||
# asyncio.run(
|
||||
# llm_guard.async_moderation_hook(
|
||||
# data={"messages": [{"role": "user", "content": "Hey how's it going?"}]}
|
||||
# )
|
||||
# )
|
||||
@ -1,315 +0,0 @@
|
||||
"""
|
||||
PagerDuty Alerting Integration
|
||||
|
||||
Handles two types of alerts:
|
||||
- High LLM API Failure Rate. Configure X fails in Y seconds to trigger an alert.
|
||||
- High Number of Hanging LLM Requests. Configure X hangs in Y seconds to trigger an alert.
|
||||
|
||||
Note: This is a Free feature on the regular litellm docker image.
|
||||
|
||||
However, this is under the enterprise license
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import List, Optional, Union
|
||||
|
||||
from litellm._logging import verbose_logger
|
||||
from litellm.caching import DualCache
|
||||
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
|
||||
from litellm.llms.custom_httpx.http_handler import (
|
||||
AsyncHTTPHandler,
|
||||
get_async_httpx_client,
|
||||
httpxSpecialProvider,
|
||||
)
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.types.integrations.pagerduty import (
|
||||
AlertingConfig,
|
||||
PagerDutyInternalEvent,
|
||||
PagerDutyPayload,
|
||||
PagerDutyRequestBody,
|
||||
)
|
||||
from litellm.types.utils import (
|
||||
CallTypesLiteral,
|
||||
StandardLoggingPayload,
|
||||
StandardLoggingPayloadErrorInformation,
|
||||
)
|
||||
|
||||
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD = 60
|
||||
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS = 60
|
||||
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS = 60
|
||||
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS = 600
|
||||
|
||||
|
||||
class PagerDutyAlerting(SlackAlerting):
|
||||
"""
|
||||
Tracks failed requests and hanging requests separately.
|
||||
If threshold is crossed for either type, triggers a PagerDuty alert.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, alerting_args: Optional[Union[AlertingConfig, dict]] = None, **kwargs
|
||||
):
|
||||
super().__init__()
|
||||
_api_key = os.getenv("PAGERDUTY_API_KEY")
|
||||
if not _api_key:
|
||||
raise ValueError("PAGERDUTY_API_KEY is not set")
|
||||
|
||||
self.api_key: str = _api_key
|
||||
alerting_args = alerting_args or {}
|
||||
self.pagerduty_alerting_args: AlertingConfig = AlertingConfig(
|
||||
failure_threshold=alerting_args.get(
|
||||
"failure_threshold", PAGERDUTY_DEFAULT_FAILURE_THRESHOLD
|
||||
),
|
||||
failure_threshold_window_seconds=alerting_args.get(
|
||||
"failure_threshold_window_seconds",
|
||||
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS,
|
||||
),
|
||||
hanging_threshold_seconds=alerting_args.get(
|
||||
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
|
||||
),
|
||||
hanging_threshold_window_seconds=alerting_args.get(
|
||||
"hanging_threshold_window_seconds",
|
||||
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
|
||||
),
|
||||
)
|
||||
|
||||
# Separate storage for failures vs. hangs
|
||||
self._failure_events: List[PagerDutyInternalEvent] = []
|
||||
self._hanging_events: List[PagerDutyInternalEvent] = []
|
||||
|
||||
# ------------------ MAIN LOGIC ------------------ #
|
||||
|
||||
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
||||
"""
|
||||
Record a failure event. Only send an alert to PagerDuty if the
|
||||
configured *failure* threshold is exceeded in the specified window.
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
|
||||
"standard_logging_object"
|
||||
)
|
||||
if not standard_logging_payload:
|
||||
raise ValueError(
|
||||
"standard_logging_object is required for PagerDutyAlerting"
|
||||
)
|
||||
|
||||
# Extract error details
|
||||
error_info: Optional[StandardLoggingPayloadErrorInformation] = (
|
||||
standard_logging_payload.get("error_information") or {}
|
||||
)
|
||||
_meta = standard_logging_payload.get("metadata") or {}
|
||||
|
||||
self._failure_events.append(
|
||||
PagerDutyInternalEvent(
|
||||
failure_event_type="failed_response",
|
||||
timestamp=now,
|
||||
error_class=error_info.get("error_class"),
|
||||
error_code=error_info.get("error_code"),
|
||||
error_llm_provider=error_info.get("llm_provider"),
|
||||
user_api_key_hash=_meta.get("user_api_key_hash"),
|
||||
user_api_key_alias=_meta.get("user_api_key_alias"),
|
||||
user_api_key_spend=_meta.get("user_api_key_spend"),
|
||||
user_api_key_max_budget=_meta.get("user_api_key_max_budget"),
|
||||
user_api_key_budget_reset_at=_meta.get("user_api_key_budget_reset_at"),
|
||||
user_api_key_org_id=_meta.get("user_api_key_org_id"),
|
||||
user_api_key_org_alias=_meta.get("user_api_key_org_alias"),
|
||||
user_api_key_team_id=_meta.get("user_api_key_team_id"),
|
||||
user_api_key_project_id=_meta.get("user_api_key_project_id"),
|
||||
user_api_key_project_alias=_meta.get("user_api_key_project_alias"),
|
||||
user_api_key_user_id=_meta.get("user_api_key_user_id"),
|
||||
user_api_key_team_alias=_meta.get("user_api_key_team_alias"),
|
||||
user_api_key_end_user_id=_meta.get("user_api_key_end_user_id"),
|
||||
user_api_key_user_email=_meta.get("user_api_key_user_email"),
|
||||
user_api_key_request_route=_meta.get("user_api_key_request_route"),
|
||||
user_api_key_auth_metadata=_meta.get("user_api_key_auth_metadata"),
|
||||
)
|
||||
)
|
||||
|
||||
# Prune + Possibly alert
|
||||
window_seconds = self.pagerduty_alerting_args.get(
|
||||
"failure_threshold_window_seconds", 60
|
||||
)
|
||||
threshold = self.pagerduty_alerting_args.get("failure_threshold", 1)
|
||||
|
||||
# If threshold is crossed, send PD alert for failures
|
||||
await self._send_alert_if_thresholds_crossed(
|
||||
events=self._failure_events,
|
||||
window_seconds=window_seconds,
|
||||
threshold=threshold,
|
||||
alert_prefix="High LLM API Failure Rate",
|
||||
)
|
||||
|
||||
async def async_pre_call_hook(
|
||||
self,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
cache: DualCache,
|
||||
data: dict,
|
||||
call_type: CallTypesLiteral,
|
||||
) -> Optional[Union[Exception, str, dict]]:
|
||||
"""
|
||||
Example of detecting hanging requests by waiting a given threshold.
|
||||
If the request didn't finish by then, we treat it as 'hanging'.
|
||||
"""
|
||||
verbose_logger.info("Inside Proxy Logging Pre-call hook!")
|
||||
asyncio.create_task(
|
||||
self.hanging_response_handler(
|
||||
request_data=data, user_api_key_dict=user_api_key_dict
|
||||
)
|
||||
)
|
||||
return None
|
||||
|
||||
async def hanging_response_handler(
|
||||
self, request_data: Optional[dict], user_api_key_dict: UserAPIKeyAuth
|
||||
):
|
||||
"""
|
||||
Checks if request completed by the time 'hanging_threshold_seconds' elapses.
|
||||
If not, we classify it as a hanging request.
|
||||
"""
|
||||
verbose_logger.debug(
|
||||
f"Inside Hanging Response Handler!..sleeping for {self.pagerduty_alerting_args.get('hanging_threshold_seconds', PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS)} seconds"
|
||||
)
|
||||
await asyncio.sleep(
|
||||
self.pagerduty_alerting_args.get(
|
||||
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
|
||||
)
|
||||
)
|
||||
|
||||
if await self._request_is_completed(request_data=request_data):
|
||||
return # It's not hanging if completed
|
||||
|
||||
# Otherwise, record it as hanging
|
||||
self._hanging_events.append(
|
||||
PagerDutyInternalEvent(
|
||||
failure_event_type="hanging_response",
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
error_class="HangingRequest",
|
||||
error_code="HangingRequest",
|
||||
error_llm_provider="HangingRequest",
|
||||
user_api_key_hash=user_api_key_dict.api_key,
|
||||
user_api_key_alias=user_api_key_dict.key_alias,
|
||||
user_api_key_spend=user_api_key_dict.spend,
|
||||
user_api_key_max_budget=user_api_key_dict.max_budget,
|
||||
user_api_key_budget_reset_at=(
|
||||
user_api_key_dict.budget_reset_at.isoformat()
|
||||
if user_api_key_dict.budget_reset_at
|
||||
else None
|
||||
),
|
||||
user_api_key_org_id=user_api_key_dict.org_id,
|
||||
user_api_key_org_alias=user_api_key_dict.organization_alias,
|
||||
user_api_key_team_id=user_api_key_dict.team_id,
|
||||
user_api_key_project_id=user_api_key_dict.project_id,
|
||||
user_api_key_project_alias=user_api_key_dict.project_alias,
|
||||
user_api_key_user_id=user_api_key_dict.user_id,
|
||||
user_api_key_team_alias=user_api_key_dict.team_alias,
|
||||
user_api_key_end_user_id=user_api_key_dict.end_user_id,
|
||||
user_api_key_user_email=user_api_key_dict.user_email,
|
||||
user_api_key_request_route=user_api_key_dict.request_route,
|
||||
user_api_key_auth_metadata=user_api_key_dict.metadata,
|
||||
)
|
||||
)
|
||||
|
||||
# Prune + Possibly alert
|
||||
window_seconds = self.pagerduty_alerting_args.get(
|
||||
"hanging_threshold_window_seconds",
|
||||
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
|
||||
)
|
||||
threshold: int = self.pagerduty_alerting_args.get(
|
||||
"hanging_threshold_fails", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
|
||||
)
|
||||
|
||||
# If threshold is crossed, send PD alert for hangs
|
||||
await self._send_alert_if_thresholds_crossed(
|
||||
events=self._hanging_events,
|
||||
window_seconds=window_seconds,
|
||||
threshold=threshold,
|
||||
alert_prefix="High Number of Hanging LLM Requests",
|
||||
)
|
||||
|
||||
# ------------------ HELPERS ------------------ #
|
||||
|
||||
async def _send_alert_if_thresholds_crossed(
|
||||
self,
|
||||
events: List[PagerDutyInternalEvent],
|
||||
window_seconds: int,
|
||||
threshold: int,
|
||||
alert_prefix: str,
|
||||
):
|
||||
"""
|
||||
1. Prune old events
|
||||
2. If threshold is reached, build alert, send to PagerDuty
|
||||
3. Clear those events
|
||||
"""
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(seconds=window_seconds)
|
||||
pruned = [e for e in events if e.get("timestamp", datetime.min) > cutoff]
|
||||
|
||||
# Update the reference list
|
||||
events.clear()
|
||||
events.extend(pruned)
|
||||
|
||||
# Check threshold
|
||||
verbose_logger.debug(
|
||||
f"Have {len(events)} events in the last {window_seconds} seconds. Threshold is {threshold}"
|
||||
)
|
||||
if len(events) >= threshold:
|
||||
# Build short summary of last N events
|
||||
error_summaries = self._build_error_summaries(events, max_errors=5)
|
||||
alert_message = (
|
||||
f"{alert_prefix}: {len(events)} in the last {window_seconds} seconds."
|
||||
)
|
||||
custom_details = {"recent_errors": error_summaries}
|
||||
|
||||
await self.send_alert_to_pagerduty(
|
||||
alert_message=alert_message,
|
||||
custom_details=custom_details,
|
||||
)
|
||||
|
||||
# Clear them after sending an alert, so we don't spam
|
||||
events.clear()
|
||||
|
||||
def _build_error_summaries(
|
||||
self, events: List[PagerDutyInternalEvent], max_errors: int = 5
|
||||
) -> List[PagerDutyInternalEvent]:
|
||||
"""
|
||||
Build short text summaries for the last `max_errors`.
|
||||
Example: "ValueError (code: 500, provider: openai)"
|
||||
"""
|
||||
recent = events[-max_errors:]
|
||||
summaries = []
|
||||
for fe in recent:
|
||||
# If any of these is None, show "N/A" to avoid messing up the summary string
|
||||
fe.pop("timestamp")
|
||||
summaries.append(fe)
|
||||
return summaries
|
||||
|
||||
async def send_alert_to_pagerduty(self, alert_message: str, custom_details: dict):
|
||||
"""
|
||||
Send [critical] Alert to PagerDuty
|
||||
|
||||
https://developer.pagerduty.com/api-reference/YXBpOjI3NDgyNjU-pager-duty-v2-events-api
|
||||
"""
|
||||
try:
|
||||
verbose_logger.debug(f"Sending alert to PagerDuty: {alert_message}")
|
||||
async_client: AsyncHTTPHandler = get_async_httpx_client(
|
||||
llm_provider=httpxSpecialProvider.LoggingCallback
|
||||
)
|
||||
payload: PagerDutyRequestBody = PagerDutyRequestBody(
|
||||
payload=PagerDutyPayload(
|
||||
summary=alert_message,
|
||||
severity="critical",
|
||||
source="LiteLLM Alert",
|
||||
component="LiteLLM",
|
||||
custom_details=custom_details,
|
||||
),
|
||||
routing_key=self.api_key,
|
||||
event_action="trigger",
|
||||
)
|
||||
|
||||
return await async_client.post(
|
||||
url="https://events.pagerduty.com/v2/enqueue",
|
||||
json=dict(payload),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
except Exception as e:
|
||||
verbose_logger.exception(f"Error sending alert to PagerDuty: {e}")
|
||||
@ -1,523 +0,0 @@
|
||||
# +-------------------------------------------------------------+
|
||||
#
|
||||
# Use SecretDetection /moderations for your LLM calls
|
||||
#
|
||||
# +-------------------------------------------------------------+
|
||||
# Thank you users! We ❤️ you! - Krrish & Ishaan
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(
|
||||
0, os.path.abspath("../..")
|
||||
) # Adds the parent directory to the system path
|
||||
import tempfile
|
||||
from typing import Optional
|
||||
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.caching.caching import DualCache
|
||||
from litellm.integrations.custom_guardrail import CustomGuardrail
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.proxy.guardrails._content_utils import walk_user_text
|
||||
|
||||
GUARDRAIL_NAME = "hide_secrets"
|
||||
|
||||
_custom_plugins_path = "file://" + os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), "secrets_plugins"
|
||||
)
|
||||
_default_detect_secrets_config = {
|
||||
"plugins_used": [
|
||||
{"name": "SoftlayerDetector"},
|
||||
{"name": "StripeDetector"},
|
||||
{"name": "NpmDetector"},
|
||||
{"name": "IbmCosHmacDetector"},
|
||||
{"name": "DiscordBotTokenDetector"},
|
||||
{"name": "BasicAuthDetector"},
|
||||
{"name": "AzureStorageKeyDetector"},
|
||||
{"name": "ArtifactoryDetector"},
|
||||
{"name": "AWSKeyDetector"},
|
||||
{"name": "CloudantDetector"},
|
||||
{"name": "IbmCloudIamDetector"},
|
||||
{"name": "JwtTokenDetector"},
|
||||
{"name": "MailchimpDetector"},
|
||||
{"name": "SquareOAuthDetector"},
|
||||
{"name": "PrivateKeyDetector"},
|
||||
{"name": "TwilioKeyDetector"},
|
||||
{
|
||||
"name": "AdafruitKeyDetector",
|
||||
"path": _custom_plugins_path + "/adafruit.py",
|
||||
},
|
||||
{
|
||||
"name": "AdobeSecretDetector",
|
||||
"path": _custom_plugins_path + "/adobe.py",
|
||||
},
|
||||
{
|
||||
"name": "AgeSecretKeyDetector",
|
||||
"path": _custom_plugins_path + "/age_secret_key.py",
|
||||
},
|
||||
{
|
||||
"name": "AirtableApiKeyDetector",
|
||||
"path": _custom_plugins_path + "/airtable_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "AlgoliaApiKeyDetector",
|
||||
"path": _custom_plugins_path + "/algolia_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "AlibabaSecretDetector",
|
||||
"path": _custom_plugins_path + "/alibaba.py",
|
||||
},
|
||||
{
|
||||
"name": "AsanaSecretDetector",
|
||||
"path": _custom_plugins_path + "/asana.py",
|
||||
},
|
||||
{
|
||||
"name": "AtlassianApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/atlassian_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "AuthressAccessKeyDetector",
|
||||
"path": _custom_plugins_path + "/authress_access_key.py",
|
||||
},
|
||||
{
|
||||
"name": "BittrexDetector",
|
||||
"path": _custom_plugins_path + "/beamer_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "BitbucketDetector",
|
||||
"path": _custom_plugins_path + "/bitbucket.py",
|
||||
},
|
||||
{
|
||||
"name": "BeamerApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/bittrex.py",
|
||||
},
|
||||
{
|
||||
"name": "ClojarsApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/clojars_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "CodecovAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/codecov_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "CoinbaseAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/coinbase_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "ConfluentDetector",
|
||||
"path": _custom_plugins_path + "/confluent.py",
|
||||
},
|
||||
{
|
||||
"name": "ContentfulApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/contentful_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DatabricksApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/databricks_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DatadogAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/datadog_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DefinedNetworkingApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/defined_networking_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DigitaloceanDetector",
|
||||
"path": _custom_plugins_path + "/digitalocean.py",
|
||||
},
|
||||
{
|
||||
"name": "DopplerApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/doppler_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DroneciAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/droneci_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DuffelApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/duffel_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DynatraceApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/dynatrace_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "DiscordDetector",
|
||||
"path": _custom_plugins_path + "/discord.py",
|
||||
},
|
||||
{
|
||||
"name": "DropboxDetector",
|
||||
"path": _custom_plugins_path + "/dropbox.py",
|
||||
},
|
||||
{
|
||||
"name": "EasyPostDetector",
|
||||
"path": _custom_plugins_path + "/easypost.py",
|
||||
},
|
||||
{
|
||||
"name": "EtsyAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/etsy_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "FacebookAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/facebook_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "FastlyApiKeyDetector",
|
||||
"path": _custom_plugins_path + "/fastly_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "FinicityDetector",
|
||||
"path": _custom_plugins_path + "/finicity.py",
|
||||
},
|
||||
{
|
||||
"name": "FinnhubAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/finnhub_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "FlickrAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/flickr_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "FlutterwaveDetector",
|
||||
"path": _custom_plugins_path + "/flutterwave.py",
|
||||
},
|
||||
{
|
||||
"name": "FrameIoApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/frameio_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "FreshbooksAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/freshbooks_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "GCPApiKeyDetector",
|
||||
"path": _custom_plugins_path + "/gcp_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "GitHubTokenCustomDetector",
|
||||
"path": _custom_plugins_path + "/github_token.py",
|
||||
},
|
||||
{
|
||||
"name": "GitLabDetector",
|
||||
"path": _custom_plugins_path + "/gitlab.py",
|
||||
},
|
||||
{
|
||||
"name": "GitterAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/gitter_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "GoCardlessApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/gocardless_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "GrafanaDetector",
|
||||
"path": _custom_plugins_path + "/grafana.py",
|
||||
},
|
||||
{
|
||||
"name": "HashiCorpTFApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/hashicorp_tf_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "HerokuApiKeyDetector",
|
||||
"path": _custom_plugins_path + "/heroku_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "HubSpotApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/hubspot_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "HuggingFaceDetector",
|
||||
"path": _custom_plugins_path + "/huggingface.py",
|
||||
},
|
||||
{
|
||||
"name": "IntercomApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/intercom_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "JFrogDetector",
|
||||
"path": _custom_plugins_path + "/jfrog.py",
|
||||
},
|
||||
{
|
||||
"name": "JWTBase64Detector",
|
||||
"path": _custom_plugins_path + "/jwt.py",
|
||||
},
|
||||
{
|
||||
"name": "KrakenAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/kraken_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "KucoinDetector",
|
||||
"path": _custom_plugins_path + "/kucoin.py",
|
||||
},
|
||||
{
|
||||
"name": "LaunchdarklyAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/launchdarkly_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "LinearDetector",
|
||||
"path": _custom_plugins_path + "/linear.py",
|
||||
},
|
||||
{
|
||||
"name": "LinkedInDetector",
|
||||
"path": _custom_plugins_path + "/linkedin.py",
|
||||
},
|
||||
{
|
||||
"name": "LobDetector",
|
||||
"path": _custom_plugins_path + "/lob.py",
|
||||
},
|
||||
{
|
||||
"name": "MailgunDetector",
|
||||
"path": _custom_plugins_path + "/mailgun.py",
|
||||
},
|
||||
{
|
||||
"name": "MapBoxApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/mapbox_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "MattermostAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/mattermost_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "MessageBirdDetector",
|
||||
"path": _custom_plugins_path + "/messagebird.py",
|
||||
},
|
||||
{
|
||||
"name": "MicrosoftTeamsWebhookDetector",
|
||||
"path": _custom_plugins_path + "/microsoft_teams_webhook.py",
|
||||
},
|
||||
{
|
||||
"name": "NetlifyAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/netlify_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "NewRelicDetector",
|
||||
"path": _custom_plugins_path + "/new_relic.py",
|
||||
},
|
||||
{
|
||||
"name": "NYTimesAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/nytimes_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "OktaAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/okta_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "OpenAIApiKeyDetector",
|
||||
"path": _custom_plugins_path + "/openai_api_key.py",
|
||||
},
|
||||
{
|
||||
"name": "PlanetScaleDetector",
|
||||
"path": _custom_plugins_path + "/planetscale.py",
|
||||
},
|
||||
{
|
||||
"name": "PostmanApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/postman_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "PrefectApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/prefect_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "PulumiApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/pulumi_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "PyPiUploadTokenDetector",
|
||||
"path": _custom_plugins_path + "/pypi_upload_token.py",
|
||||
},
|
||||
{
|
||||
"name": "RapidApiAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/rapidapi_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "ReadmeApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/readme_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "RubygemsApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/rubygems_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "ScalingoApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/scalingo_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "SendbirdDetector",
|
||||
"path": _custom_plugins_path + "/sendbird.py",
|
||||
},
|
||||
{
|
||||
"name": "SendGridApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/sendgrid_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "SendinBlueApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/sendinblue_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "SentryAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/sentry_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "ShippoApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/shippo_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "ShopifyDetector",
|
||||
"path": _custom_plugins_path + "/shopify.py",
|
||||
},
|
||||
{
|
||||
"name": "SlackDetector",
|
||||
"path": _custom_plugins_path + "/slack.py",
|
||||
},
|
||||
{
|
||||
"name": "SnykApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/snyk_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "SquarespaceAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/squarespace_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "SumoLogicDetector",
|
||||
"path": _custom_plugins_path + "/sumologic.py",
|
||||
},
|
||||
{
|
||||
"name": "TelegramBotApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/telegram_bot_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "TravisCiAccessTokenDetector",
|
||||
"path": _custom_plugins_path + "/travisci_access_token.py",
|
||||
},
|
||||
{
|
||||
"name": "TwitchApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/twitch_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "TwitterDetector",
|
||||
"path": _custom_plugins_path + "/twitter.py",
|
||||
},
|
||||
{
|
||||
"name": "TypeformApiTokenDetector",
|
||||
"path": _custom_plugins_path + "/typeform_api_token.py",
|
||||
},
|
||||
{
|
||||
"name": "VaultDetector",
|
||||
"path": _custom_plugins_path + "/vault.py",
|
||||
},
|
||||
{
|
||||
"name": "YandexDetector",
|
||||
"path": _custom_plugins_path + "/yandex.py",
|
||||
},
|
||||
{
|
||||
"name": "ZendeskSecretKeyDetector",
|
||||
"path": _custom_plugins_path + "/zendesk_secret_key.py",
|
||||
},
|
||||
{"name": "Base64HighEntropyString", "limit": 3.0},
|
||||
{"name": "HexHighEntropyString", "limit": 3.0},
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
class _ENTERPRISE_SecretDetection(CustomGuardrail):
|
||||
def __init__(self, detect_secrets_config: Optional[dict] = None, **kwargs):
|
||||
self.user_defined_detect_secrets_config = detect_secrets_config
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def scan_message_for_secrets(self, message_content: str):
|
||||
from detect_secrets import SecretsCollection
|
||||
from detect_secrets.settings import transient_settings
|
||||
|
||||
temp_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
temp_file.write(message_content.encode("utf-8"))
|
||||
temp_file.close()
|
||||
|
||||
secrets = SecretsCollection()
|
||||
|
||||
detect_secrets_config = (
|
||||
self.user_defined_detect_secrets_config or _default_detect_secrets_config
|
||||
)
|
||||
with transient_settings(detect_secrets_config):
|
||||
secrets.scan_file(temp_file.name)
|
||||
|
||||
os.remove(temp_file.name)
|
||||
|
||||
detected_secrets = []
|
||||
for file in secrets.files:
|
||||
for found_secret in secrets[file]:
|
||||
if found_secret.secret_value is None:
|
||||
continue
|
||||
detected_secrets.append(
|
||||
{"type": found_secret.type, "value": found_secret.secret_value}
|
||||
)
|
||||
|
||||
return detected_secrets
|
||||
|
||||
async def should_run_check(self, user_api_key_dict: UserAPIKeyAuth) -> bool:
|
||||
if user_api_key_dict.permissions is not None:
|
||||
if GUARDRAIL_NAME in user_api_key_dict.permissions:
|
||||
if user_api_key_dict.permissions[GUARDRAIL_NAME] is False:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
#### CALL HOOKS - proxy only ####
|
||||
async def async_pre_call_hook(
|
||||
self,
|
||||
user_api_key_dict: UserAPIKeyAuth,
|
||||
cache: DualCache,
|
||||
data: dict,
|
||||
call_type: str, # "completion", "embeddings", "image_generation", "moderation"
|
||||
):
|
||||
if await self.should_run_check(user_api_key_dict) is False:
|
||||
return
|
||||
|
||||
# Covers multimodal list content + Responses-API input.
|
||||
def _redact_message_text(text: str) -> str:
|
||||
detected_secrets = self.scan_message_for_secrets(text)
|
||||
for secret in detected_secrets:
|
||||
text = text.replace(secret["value"], "[REDACTED]")
|
||||
if detected_secrets:
|
||||
secret_types = [secret["type"] for secret in detected_secrets]
|
||||
verbose_proxy_logger.warning(
|
||||
f"Detected and redacted secrets in message: {secret_types}"
|
||||
)
|
||||
return text
|
||||
|
||||
walk_user_text(data, _redact_message_text)
|
||||
|
||||
if "prompt" in data:
|
||||
if isinstance(data["prompt"], str):
|
||||
detected_secrets = self.scan_message_for_secrets(data["prompt"])
|
||||
for secret in detected_secrets:
|
||||
data["prompt"] = data["prompt"].replace(
|
||||
secret["value"], "[REDACTED]"
|
||||
)
|
||||
if len(detected_secrets) > 0:
|
||||
secret_types = [secret["type"] for secret in detected_secrets]
|
||||
verbose_proxy_logger.warning(
|
||||
f"Detected and redacted secrets in prompt: {secret_types}"
|
||||
)
|
||||
elif isinstance(data["prompt"], list):
|
||||
# Index back into the list — assigning to ``item`` would only
|
||||
# rebind the loop variable and leave ``data["prompt"]``
|
||||
# carrying the unredacted secret.
|
||||
for idx, item in enumerate(data["prompt"]):
|
||||
if isinstance(item, str):
|
||||
detected_secrets = self.scan_message_for_secrets(item)
|
||||
for secret in detected_secrets:
|
||||
item = item.replace(secret["value"], "[REDACTED]")
|
||||
data["prompt"][idx] = item
|
||||
if len(detected_secrets) > 0:
|
||||
secret_types = [
|
||||
secret["type"] for secret in detected_secrets
|
||||
]
|
||||
verbose_proxy_logger.warning(
|
||||
f"Detected and redacted secrets in prompt: {secret_types}"
|
||||
)
|
||||
|
||||
# ``data["input"]`` (Responses API and embeddings/moderation) is
|
||||
# already covered by ``walk_user_text`` above.
|
||||
return
|
||||
@ -1,23 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Adafruit keys
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AdafruitKeyDetector(RegexBasedDetector):
|
||||
"""Scans for Adafruit keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Adafruit API Key"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(
|
||||
r"""(?i)(?:adafruit)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9_-]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
)
|
||||
]
|
||||
@ -1,26 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Adobe keys
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AdobeSecretDetector(RegexBasedDetector):
|
||||
"""Scans for Adobe client keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Adobe Client Keys"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# Adobe Client ID (OAuth Web)
|
||||
re.compile(
|
||||
r"""(?i)(?:adobe)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-f0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# Adobe Client Secret
|
||||
re.compile(r"(?i)\b((p8e-)[a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"),
|
||||
]
|
||||
@ -1,21 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Age secret keys
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AgeSecretKeyDetector(RegexBasedDetector):
|
||||
"""Scans for Age secret keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Age Secret Key"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(r"""AGE-SECRET-KEY-1[QPZRY9X8GF2TVDW0S3JN54KHCE6MUA7L]{58}"""),
|
||||
]
|
||||
@ -1,23 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Airtable API keys
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AirtableApiKeyDetector(RegexBasedDetector):
|
||||
"""Scans for Airtable API keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Airtable API Key"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(
|
||||
r"""(?i)(?:airtable)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{17})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,21 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Algolia API keys
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AlgoliaApiKeyDetector(RegexBasedDetector):
|
||||
"""Scans for Algolia API keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Algolia API Key"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(r"""(?i)\b((LTAI)[a-z0-9]{20})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
|
||||
]
|
||||
@ -1,26 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Alibaba secrets
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AlibabaSecretDetector(RegexBasedDetector):
|
||||
"""Scans for Alibaba AccessKey IDs and Secret Keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Alibaba Secrets"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Alibaba AccessKey ID
|
||||
re.compile(r"""(?i)\b((LTAI)[a-z0-9]{20})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
|
||||
# For Alibaba Secret Key
|
||||
re.compile(
|
||||
r"""(?i)(?:alibaba)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{30})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,28 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Asana secrets
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AsanaSecretDetector(RegexBasedDetector):
|
||||
"""Scans for Asana Client IDs and Client Secrets."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Asana Secrets"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Asana Client ID
|
||||
re.compile(
|
||||
r"""(?i)(?:asana)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9]{16})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# For Asana Client Secret
|
||||
re.compile(
|
||||
r"""(?i)(?:asana)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Atlassian API tokens
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AtlassianApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Atlassian API tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Atlassian API token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Atlassian API token
|
||||
re.compile(
|
||||
r"""(?i)(?:atlassian|confluence|jira)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{24})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Authress Service Client Access Keys
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class AuthressAccessKeyDetector(RegexBasedDetector):
|
||||
"""Scans for Authress Service Client Access Keys."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Authress Service Client Access Key"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Authress Service Client Access Key
|
||||
re.compile(
|
||||
r"""(?i)\b((?:sc|ext|scauth|authress)_[a-z0-9]{5,30}\.[a-z0-9]{4,6}\.acc[_-][a-z0-9-]{10,32}\.[a-z0-9+/_=-]{30,120})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Beamer API tokens
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class BeamerApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Beamer API tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Beamer API token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Beamer API token
|
||||
re.compile(
|
||||
r"""(?i)(?:beamer)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}(b_[a-z0-9=_\-]{44})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,28 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Bitbucket Client ID and Client Secret
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class BitbucketDetector(RegexBasedDetector):
|
||||
"""Scans for Bitbucket Client ID and Client Secret."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Bitbucket Secrets"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Bitbucket Client ID
|
||||
re.compile(
|
||||
r"""(?i)(?:bitbucket)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# For Bitbucket Client Secret
|
||||
re.compile(
|
||||
r"""(?i)(?:bitbucket)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9=_\-]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,28 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Bittrex Access Key and Secret Key
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class BittrexDetector(RegexBasedDetector):
|
||||
"""Scans for Bittrex Access Key and Secret Key."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Bittrex Secrets"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Bittrex Access Key
|
||||
re.compile(
|
||||
r"""(?i)(?:bittrex)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# For Bittrex Secret Key
|
||||
re.compile(
|
||||
r"""(?i)(?:bittrex)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,22 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Clojars API tokens
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class ClojarsApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Clojars API tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Clojars API token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Clojars API token
|
||||
re.compile(r"(?i)(CLOJARS_)[a-z0-9]{60}"),
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Codecov Access Token
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class CodecovAccessTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Codecov Access Token."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Codecov Access Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Codecov Access Token
|
||||
re.compile(
|
||||
r"""(?i)(?:codecov)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Coinbase Access Token
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class CoinbaseAccessTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Coinbase Access Token."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Coinbase Access Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Coinbase Access Token
|
||||
re.compile(
|
||||
r"""(?i)(?:coinbase)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9_-]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,28 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Confluent Access Token and Confluent Secret Key
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class ConfluentDetector(RegexBasedDetector):
|
||||
"""Scans for Confluent Access Token and Confluent Secret Key."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Confluent Secret"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# For Confluent Access Token
|
||||
re.compile(
|
||||
r"""(?i)(?:confluent)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{16})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# For Confluent Secret Key
|
||||
re.compile(
|
||||
r"""(?i)(?:confluent)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,23 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Contentful delivery API token.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class ContentfulApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Contentful delivery API token."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Contentful API Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(
|
||||
r"""(?i)(?:contentful)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9=_\-]{43})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,21 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Databricks API token.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DatabricksApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Databricks API token."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Databricks API Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(r"""(?i)\b(dapi[a-h0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
|
||||
]
|
||||
@ -1,23 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Datadog Access Tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DatadogAccessTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Datadog Access Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Datadog Access Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(
|
||||
r"""(?i)(?:datadog)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{40})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,23 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Defined Networking API Tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DefinedNetworkingApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Defined Networking API Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Defined Networking API Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
re.compile(
|
||||
r"""(?i)(?:dnkey)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}(dnkey-[a-z0-9=_\-]{26}-[a-z0-9=_\-]{52})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,26 +0,0 @@
|
||||
"""
|
||||
This plugin searches for DigitalOcean tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DigitaloceanDetector(RegexBasedDetector):
|
||||
"""Scans for various DigitalOcean Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "DigitalOcean Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# OAuth Access Token
|
||||
re.compile(r"""(?i)\b(doo_v1_[a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
|
||||
# Personal Access Token
|
||||
re.compile(r"""(?i)\b(dop_v1_[a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
|
||||
# OAuth Refresh Token
|
||||
re.compile(r"""(?i)\b(dor_v1_[a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""),
|
||||
]
|
||||
@ -1,32 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Discord Client tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DiscordDetector(RegexBasedDetector):
|
||||
"""Scans for various Discord Client Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Discord Client Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# Discord API key
|
||||
re.compile(
|
||||
r"""(?i)(?:discord)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-f0-9]{64})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# Discord client ID
|
||||
re.compile(
|
||||
r"""(?i)(?:discord)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9]{18})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# Discord client secret
|
||||
re.compile(
|
||||
r"""(?i)(?:discord)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9=_\-]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,22 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Doppler API tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DopplerApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Doppler API Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Doppler API Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# Doppler API token
|
||||
re.compile(r"""(?i)dp\.pt\.[a-z0-9]{43}"""),
|
||||
]
|
||||
@ -1,24 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Droneci Access Tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DroneciAccessTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Droneci Access Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Droneci Access Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# Droneci Access Token
|
||||
re.compile(
|
||||
r"""(?i)(?:droneci)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{32})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,32 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Dropbox tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DropboxDetector(RegexBasedDetector):
|
||||
"""Scans for various Dropbox Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Dropbox Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# Dropbox API secret
|
||||
re.compile(
|
||||
r"""(?i)(?:dropbox)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{15})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# Dropbox long-lived API token
|
||||
re.compile(
|
||||
r"""(?i)(?:dropbox)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([a-z0-9]{11}(AAAAAAAAAA)[a-z0-9\-_=]{43})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
# Dropbox short-lived API token
|
||||
re.compile(
|
||||
r"""(?i)(?:dropbox)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}(sl\.[a-z0-9\-=_]{135})(?:['|\"|\n|\r|\s|\x60|;]|$)"""
|
||||
),
|
||||
]
|
||||
@ -1,22 +0,0 @@
|
||||
"""
|
||||
This plugin searches for Duffel API Tokens.
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from detect_secrets.plugins.base import RegexBasedDetector
|
||||
|
||||
|
||||
class DuffelApiTokenDetector(RegexBasedDetector):
|
||||
"""Scans for Duffel API Tokens."""
|
||||
|
||||
@property
|
||||
def secret_type(self) -> str:
|
||||
return "Duffel API Token"
|
||||
|
||||
@property
|
||||
def denylist(self) -> list[re.Pattern]:
|
||||
return [
|
||||
# Duffel API Token
|
||||
re.compile(r"""(?i)duffel_(test|live)_[a-z0-9_\-=]{43}"""),
|
||||
]
|
||||
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user