feat(rating): add pricing-backed traffic billing

This commit is contained in:
Haitao Pan 2026-04-09 13:50:55 +08:00
parent 5580f0ae1a
commit afbdf7080b
6 changed files with 198 additions and 8 deletions

View File

@ -64,6 +64,18 @@ type QuotaState struct {
EffectiveAt time.Time
}
type BillingProfile struct {
AccountUUID string
PackageName string
IncludedQuotaBytes int64
BasePricePerByte float64
RegionMultiplier float64
LineMultiplier float64
PeakMultiplier float64
OffPeakMultiplier float64
PricingRuleVersion string
}
type JobResult struct {
Job string `json:"job"`
StartedAt time.Time `json:"started_at"`

View File

@ -224,6 +224,32 @@ func (p *Postgres) UpsertQuotaState(ctx context.Context, state model.QuotaState)
return err
}
func (p *Postgres) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) {
const query = `
SELECT account_uuid, package_name, included_quota_bytes, base_price_per_byte, region_multiplier, line_multiplier, peak_multiplier, offpeak_multiplier, pricing_rule_version
FROM account_billing_profiles
WHERE account_uuid = $1`
var profile model.BillingProfile
err := p.db.QueryRowContext(ctx, query, accountUUID).Scan(
&profile.AccountUUID,
&profile.PackageName,
&profile.IncludedQuotaBytes,
&profile.BasePricePerByte,
&profile.RegionMultiplier,
&profile.LineMultiplier,
&profile.PeakMultiplier,
&profile.OffPeakMultiplier,
&profile.PricingRuleVersion,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
return &profile, nil
}
var _ Repository = (*Postgres)(nil)
func ensureUTC(ts time.Time) time.Time {

View File

@ -13,4 +13,5 @@ type Repository interface {
UpsertLedger(ctx context.Context, entry model.LedgerEntry) (bool, error)
GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error)
UpsertQuotaState(ctx context.Context, state model.QuotaState) error
GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error)
}

View File

@ -148,6 +148,12 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
}
totalBytes := deltaUplink + deltaDownlink
profile, err := s.repo.GetBillingProfile(ctx, sample.UUID)
if err != nil {
return false, fmt.Errorf("get billing profile %s: %w", sample.UUID, err)
}
effectivePricing := resolvePricing(profile, s.cfg)
bucket := model.MinuteBucket{
BucketStart: minuteStart,
NodeID: storageNodeID,
@ -157,9 +163,9 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
UplinkBytes: deltaUplink,
DownlinkBytes: deltaDownlink,
TotalBytes: totalBytes,
Multiplier: 1.0,
Multiplier: effectivePricing.multiplier,
RatingStatus: "rated",
SourceRevision: s.cfg.SourceRevision,
SourceRevision: effectivePricing.pricingRuleVersion,
}
minuteExisted, err := s.repo.UpsertMinuteBucket(ctx, bucket)
@ -191,13 +197,20 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
if quota == nil {
quota = &model.QuotaState{
AccountUUID: sample.UUID,
RemainingIncludedQuota: s.cfg.InitialIncludedQuotaBytes,
RemainingIncludedQuota: effectivePricing.includedQuotaBytes,
CurrentBalance: s.cfg.InitialBalance,
ThrottleState: "normal",
SuspendState: "active",
EffectiveAt: snapshot.CollectedAt.UTC(),
}
}
includedApplied := minInt64(quota.RemainingIncludedQuota, totalBytes)
chargeableBytes := totalBytes - includedApplied
amountDelta = -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier
entry.RatedBytes = chargeableBytes
entry.AmountDelta = amountDelta
entry.PricingRuleVersion = effectivePricing.pricingRuleVersion
entry.BalanceAfter = quota.CurrentBalance + amountDelta
ledgerExisted, err := s.repo.UpsertLedger(ctx, entry)
@ -206,12 +219,18 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
}
if !ledgerExisted {
remainingQuota := quota.RemainingIncludedQuota - totalBytes
remainingQuota := quota.RemainingIncludedQuota - includedApplied
if remainingQuota < 0 {
remainingQuota = 0
}
quota.RemainingIncludedQuota = remainingQuota
quota.CurrentBalance = entry.BalanceAfter
quota.Arrears = quota.CurrentBalance < 0
if quota.Arrears {
quota.ThrottleState = "throttled"
} else {
quota.ThrottleState = "normal"
}
quota.EffectiveAt = snapshot.CollectedAt.UTC()
lastRated := minuteStart
quota.LastRatedBucketAt = &lastRated
@ -237,6 +256,54 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa
return true, nil
}
type effectivePricing struct {
includedQuotaBytes int64
basePricePerByte float64
multiplier float64
pricingRuleVersion string
}
func resolvePricing(profile *model.BillingProfile, cfg config.Config) effectivePricing {
pricing := effectivePricing{
includedQuotaBytes: cfg.InitialIncludedQuotaBytes,
basePricePerByte: cfg.PricePerByte,
multiplier: 1.0,
pricingRuleVersion: cfg.SourceRevision,
}
if profile == nil {
return pricing
}
if profile.IncludedQuotaBytes > 0 {
pricing.includedQuotaBytes = profile.IncludedQuotaBytes
}
if profile.BasePricePerByte > 0 {
pricing.basePricePerByte = profile.BasePricePerByte
}
regionMultiplier := profile.RegionMultiplier
if regionMultiplier <= 0 {
regionMultiplier = 1.0
}
lineMultiplier := profile.LineMultiplier
if lineMultiplier <= 0 {
lineMultiplier = 1.0
}
pricing.multiplier = regionMultiplier * lineMultiplier
if pricing.multiplier <= 0 {
pricing.multiplier = 1.0
}
if strings.TrimSpace(profile.PricingRuleVersion) != "" {
pricing.pricingRuleVersion = strings.TrimSpace(profile.PricingRuleVersion)
}
return pricing
}
func minInt64(a, b int64) int64 {
if a < b {
return a
}
return b
}
func validateSample(sample model.Sample) error {
if strings.TrimSpace(sample.UUID) == "" {
return fmt.Errorf("sample uuid is required")

View File

@ -24,6 +24,7 @@ type memoryRepo struct {
buckets map[string]model.MinuteBucket
ledgers map[string]model.LedgerEntry
quotas map[string]model.QuotaState
profiles map[string]model.BillingProfile
}
func newMemoryRepo() *memoryRepo {
@ -32,6 +33,7 @@ func newMemoryRepo() *memoryRepo {
buckets: map[string]model.MinuteBucket{},
ledgers: map[string]model.LedgerEntry{},
quotas: map[string]model.QuotaState{},
profiles: map[string]model.BillingProfile{},
}
}
@ -82,6 +84,14 @@ func (m *memoryRepo) UpsertQuotaState(ctx context.Context, state model.QuotaStat
return nil
}
func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) {
if profile, ok := m.profiles[accountUUID]; ok {
copy := profile
return &copy, nil
}
return nil, nil
}
var _ repository.Repository = (*memoryRepo)(nil)
func baseConfig() config.Config {
@ -89,7 +99,7 @@ func baseConfig() config.Config {
DefaultRegion: "",
SourceRevision: "billing-service-v1",
PricePerByte: 0.5,
InitialIncludedQuotaBytes: 1000,
InitialIncludedQuotaBytes: 0,
InitialBalance: 0,
}
}
@ -117,14 +127,74 @@ func TestDeltaCalculationAndQuotaUpdate(t *testing.T) {
t.Fatalf("unexpected result %#v", result)
}
quota := repo.quotas["11111111-1111-1111-1111-111111111111"]
if quota.RemainingIncludedQuota != 850 {
t.Fatalf("expected remaining quota 850, got %d", quota.RemainingIncludedQuota)
}
if quota.CurrentBalance != -75 {
t.Fatalf("expected current balance -75, got %v", quota.CurrentBalance)
}
}
func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) {
repo := newMemoryRepo()
accountUUID := "11111111-1111-1111-1111-111111111111"
repo.profiles[accountUUID] = model.BillingProfile{
AccountUUID: accountUUID,
PackageName: "starter",
IncludedQuotaBytes: 100,
BasePricePerByte: 0.5,
RegionMultiplier: 1.2,
LineMultiplier: 2.0,
PricingRuleVersion: "pricing-v2",
}
svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{
CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC),
NodeID: "jp-node",
Env: "prod",
Samples: []model.Sample{{
UUID: accountUUID,
InboundTag: "premium",
UplinkBytesTotal: 100,
DownlinkBytesTotal: 50,
}},
}}, repo)
result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate")
if err != nil {
t.Fatalf("run job: %v", err)
}
if result.ProcessedSamples != 1 || result.WrittenMinutes != 1 {
t.Fatalf("unexpected result %#v", result)
}
quota := repo.quotas[accountUUID]
if quota.RemainingIncludedQuota != 0 {
t.Fatalf("expected remaining quota 0, got %d", quota.RemainingIncludedQuota)
}
if quota.CurrentBalance != -60 {
t.Fatalf("expected current balance -60, got %v", quota.CurrentBalance)
}
bucket := repo.buckets[bucketKey(model.MinuteBucket{
BucketStart: time.Date(2026, 4, 8, 10, 30, 0, 0, time.UTC),
NodeID: composeStorageNodeID("prod", "jp-node"),
AccountUUID: accountUUID,
Region: "",
LineCode: "premium",
})]
if bucket.Multiplier != 2.4 {
t.Fatalf("expected multiplier 2.4, got %v", bucket.Multiplier)
}
for _, entry := range repo.ledgers {
if entry.RatedBytes != 50 {
t.Fatalf("expected rated bytes 50, got %d", entry.RatedBytes)
}
if entry.AmountDelta != -60 {
t.Fatalf("expected amount delta -60, got %v", entry.AmountDelta)
}
if entry.PricingRuleVersion != "pricing-v2" {
t.Fatalf("expected pricing version pricing-v2, got %q", entry.PricingRuleVersion)
}
}
}
func TestDuplicateMinuteIsReplaySafe(t *testing.T) {
repo := newMemoryRepo()
snapshot := model.Snapshot{

View File

@ -70,3 +70,17 @@ CREATE TABLE IF NOT EXISTS public.account_quota_states (
effective_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.account_billing_profiles (
account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE,
package_name TEXT NOT NULL DEFAULT 'default',
included_quota_bytes BIGINT NOT NULL DEFAULT 0,
base_price_per_byte DOUBLE PRECISION NOT NULL DEFAULT 0,
region_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
line_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
peak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
offpeak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0,
pricing_rule_version TEXT NOT NULL DEFAULT 'pricing-default-v1',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);