diff --git a/internal/model/types.go b/internal/model/types.go index 72dc977..7a58f0c 100644 --- a/internal/model/types.go +++ b/internal/model/types.go @@ -64,6 +64,18 @@ type QuotaState struct { EffectiveAt time.Time } +type BillingProfile struct { + AccountUUID string + PackageName string + IncludedQuotaBytes int64 + BasePricePerByte float64 + RegionMultiplier float64 + LineMultiplier float64 + PeakMultiplier float64 + OffPeakMultiplier float64 + PricingRuleVersion string +} + type JobResult struct { Job string `json:"job"` StartedAt time.Time `json:"started_at"` diff --git a/internal/repository/postgres.go b/internal/repository/postgres.go index 76a3a4c..0d67e05 100644 --- a/internal/repository/postgres.go +++ b/internal/repository/postgres.go @@ -224,6 +224,32 @@ func (p *Postgres) UpsertQuotaState(ctx context.Context, state model.QuotaState) return err } +func (p *Postgres) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) { + const query = ` + SELECT account_uuid, package_name, included_quota_bytes, base_price_per_byte, region_multiplier, line_multiplier, peak_multiplier, offpeak_multiplier, pricing_rule_version + FROM account_billing_profiles + WHERE account_uuid = $1` + var profile model.BillingProfile + err := p.db.QueryRowContext(ctx, query, accountUUID).Scan( + &profile.AccountUUID, + &profile.PackageName, + &profile.IncludedQuotaBytes, + &profile.BasePricePerByte, + &profile.RegionMultiplier, + &profile.LineMultiplier, + &profile.PeakMultiplier, + &profile.OffPeakMultiplier, + &profile.PricingRuleVersion, + ) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + return &profile, nil +} + var _ Repository = (*Postgres)(nil) func ensureUTC(ts time.Time) time.Time { diff --git a/internal/repository/repository.go b/internal/repository/repository.go index f356f77..8b2ffea 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -13,4 +13,5 @@ type Repository interface { UpsertLedger(ctx context.Context, entry model.LedgerEntry) (bool, error) GetQuotaState(ctx context.Context, accountUUID string) (*model.QuotaState, error) UpsertQuotaState(ctx context.Context, state model.QuotaState) error + GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) } diff --git a/internal/service/service.go b/internal/service/service.go index ceda198..d39acea 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -148,6 +148,12 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa } totalBytes := deltaUplink + deltaDownlink + profile, err := s.repo.GetBillingProfile(ctx, sample.UUID) + if err != nil { + return false, fmt.Errorf("get billing profile %s: %w", sample.UUID, err) + } + effectivePricing := resolvePricing(profile, s.cfg) + bucket := model.MinuteBucket{ BucketStart: minuteStart, NodeID: storageNodeID, @@ -157,9 +163,9 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa UplinkBytes: deltaUplink, DownlinkBytes: deltaDownlink, TotalBytes: totalBytes, - Multiplier: 1.0, + Multiplier: effectivePricing.multiplier, RatingStatus: "rated", - SourceRevision: s.cfg.SourceRevision, + SourceRevision: effectivePricing.pricingRuleVersion, } minuteExisted, err := s.repo.UpsertMinuteBucket(ctx, bucket) @@ -191,13 +197,20 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa if quota == nil { quota = &model.QuotaState{ AccountUUID: sample.UUID, - RemainingIncludedQuota: s.cfg.InitialIncludedQuotaBytes, + RemainingIncludedQuota: effectivePricing.includedQuotaBytes, CurrentBalance: s.cfg.InitialBalance, ThrottleState: "normal", SuspendState: "active", EffectiveAt: snapshot.CollectedAt.UTC(), } } + + includedApplied := minInt64(quota.RemainingIncludedQuota, totalBytes) + chargeableBytes := totalBytes - includedApplied + amountDelta = -float64(chargeableBytes) * effectivePricing.basePricePerByte * effectivePricing.multiplier + entry.RatedBytes = chargeableBytes + entry.AmountDelta = amountDelta + entry.PricingRuleVersion = effectivePricing.pricingRuleVersion entry.BalanceAfter = quota.CurrentBalance + amountDelta ledgerExisted, err := s.repo.UpsertLedger(ctx, entry) @@ -206,12 +219,18 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa } if !ledgerExisted { - remainingQuota := quota.RemainingIncludedQuota - totalBytes + remainingQuota := quota.RemainingIncludedQuota - includedApplied if remainingQuota < 0 { remainingQuota = 0 } quota.RemainingIncludedQuota = remainingQuota quota.CurrentBalance = entry.BalanceAfter + quota.Arrears = quota.CurrentBalance < 0 + if quota.Arrears { + quota.ThrottleState = "throttled" + } else { + quota.ThrottleState = "normal" + } quota.EffectiveAt = snapshot.CollectedAt.UTC() lastRated := minuteStart quota.LastRatedBucketAt = &lastRated @@ -237,6 +256,54 @@ func (s *Service) processSample(ctx context.Context, snapshot model.Snapshot, sa return true, nil } +type effectivePricing struct { + includedQuotaBytes int64 + basePricePerByte float64 + multiplier float64 + pricingRuleVersion string +} + +func resolvePricing(profile *model.BillingProfile, cfg config.Config) effectivePricing { + pricing := effectivePricing{ + includedQuotaBytes: cfg.InitialIncludedQuotaBytes, + basePricePerByte: cfg.PricePerByte, + multiplier: 1.0, + pricingRuleVersion: cfg.SourceRevision, + } + if profile == nil { + return pricing + } + if profile.IncludedQuotaBytes > 0 { + pricing.includedQuotaBytes = profile.IncludedQuotaBytes + } + if profile.BasePricePerByte > 0 { + pricing.basePricePerByte = profile.BasePricePerByte + } + regionMultiplier := profile.RegionMultiplier + if regionMultiplier <= 0 { + regionMultiplier = 1.0 + } + lineMultiplier := profile.LineMultiplier + if lineMultiplier <= 0 { + lineMultiplier = 1.0 + } + pricing.multiplier = regionMultiplier * lineMultiplier + if pricing.multiplier <= 0 { + pricing.multiplier = 1.0 + } + if strings.TrimSpace(profile.PricingRuleVersion) != "" { + pricing.pricingRuleVersion = strings.TrimSpace(profile.PricingRuleVersion) + } + return pricing +} + +func minInt64(a, b int64) int64 { + if a < b { + return a + } + return b +} + func validateSample(sample model.Sample) error { if strings.TrimSpace(sample.UUID) == "" { return fmt.Errorf("sample uuid is required") diff --git a/internal/service/service_test.go b/internal/service/service_test.go index 4445102..10f60d6 100644 --- a/internal/service/service_test.go +++ b/internal/service/service_test.go @@ -24,6 +24,7 @@ type memoryRepo struct { buckets map[string]model.MinuteBucket ledgers map[string]model.LedgerEntry quotas map[string]model.QuotaState + profiles map[string]model.BillingProfile } func newMemoryRepo() *memoryRepo { @@ -32,6 +33,7 @@ func newMemoryRepo() *memoryRepo { buckets: map[string]model.MinuteBucket{}, ledgers: map[string]model.LedgerEntry{}, quotas: map[string]model.QuotaState{}, + profiles: map[string]model.BillingProfile{}, } } @@ -82,6 +84,14 @@ func (m *memoryRepo) UpsertQuotaState(ctx context.Context, state model.QuotaStat return nil } +func (m *memoryRepo) GetBillingProfile(ctx context.Context, accountUUID string) (*model.BillingProfile, error) { + if profile, ok := m.profiles[accountUUID]; ok { + copy := profile + return ©, nil + } + return nil, nil +} + var _ repository.Repository = (*memoryRepo)(nil) func baseConfig() config.Config { @@ -89,7 +99,7 @@ func baseConfig() config.Config { DefaultRegion: "", SourceRevision: "billing-service-v1", PricePerByte: 0.5, - InitialIncludedQuotaBytes: 1000, + InitialIncludedQuotaBytes: 0, InitialBalance: 0, } } @@ -117,14 +127,74 @@ func TestDeltaCalculationAndQuotaUpdate(t *testing.T) { t.Fatalf("unexpected result %#v", result) } quota := repo.quotas["11111111-1111-1111-1111-111111111111"] - if quota.RemainingIncludedQuota != 850 { - t.Fatalf("expected remaining quota 850, got %d", quota.RemainingIncludedQuota) - } if quota.CurrentBalance != -75 { t.Fatalf("expected current balance -75, got %v", quota.CurrentBalance) } } +func TestIncludedQuotaAndMultipliersFromBillingProfile(t *testing.T) { + repo := newMemoryRepo() + accountUUID := "11111111-1111-1111-1111-111111111111" + repo.profiles[accountUUID] = model.BillingProfile{ + AccountUUID: accountUUID, + PackageName: "starter", + IncludedQuotaBytes: 100, + BasePricePerByte: 0.5, + RegionMultiplier: 1.2, + LineMultiplier: 2.0, + PricingRuleVersion: "pricing-v2", + } + svc := New(baseConfig(), fakeSource{snapshot: model.Snapshot{ + CollectedAt: time.Date(2026, 4, 8, 10, 30, 15, 0, time.UTC), + NodeID: "jp-node", + Env: "prod", + Samples: []model.Sample{{ + UUID: accountUUID, + InboundTag: "premium", + UplinkBytesTotal: 100, + DownlinkBytesTotal: 50, + }}, + }}, repo) + + result, err := svc.RunCollectAndRate(context.Background(), "collect-and-rate") + if err != nil { + t.Fatalf("run job: %v", err) + } + if result.ProcessedSamples != 1 || result.WrittenMinutes != 1 { + t.Fatalf("unexpected result %#v", result) + } + + quota := repo.quotas[accountUUID] + if quota.RemainingIncludedQuota != 0 { + t.Fatalf("expected remaining quota 0, got %d", quota.RemainingIncludedQuota) + } + if quota.CurrentBalance != -60 { + t.Fatalf("expected current balance -60, got %v", quota.CurrentBalance) + } + + bucket := repo.buckets[bucketKey(model.MinuteBucket{ + BucketStart: time.Date(2026, 4, 8, 10, 30, 0, 0, time.UTC), + NodeID: composeStorageNodeID("prod", "jp-node"), + AccountUUID: accountUUID, + Region: "", + LineCode: "premium", + })] + if bucket.Multiplier != 2.4 { + t.Fatalf("expected multiplier 2.4, got %v", bucket.Multiplier) + } + for _, entry := range repo.ledgers { + if entry.RatedBytes != 50 { + t.Fatalf("expected rated bytes 50, got %d", entry.RatedBytes) + } + if entry.AmountDelta != -60 { + t.Fatalf("expected amount delta -60, got %v", entry.AmountDelta) + } + if entry.PricingRuleVersion != "pricing-v2" { + t.Fatalf("expected pricing version pricing-v2, got %q", entry.PricingRuleVersion) + } + } +} + func TestDuplicateMinuteIsReplaySafe(t *testing.T) { repo := newMemoryRepo() snapshot := model.Snapshot{ diff --git a/testdata/postgres/init.sql b/testdata/postgres/init.sql index 81268f0..fb487cb 100644 --- a/testdata/postgres/init.sql +++ b/testdata/postgres/init.sql @@ -70,3 +70,17 @@ CREATE TABLE IF NOT EXISTS public.account_quota_states ( effective_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); + +CREATE TABLE IF NOT EXISTS public.account_billing_profiles ( + account_uuid UUID PRIMARY KEY REFERENCES public.users(uuid) ON DELETE CASCADE, + package_name TEXT NOT NULL DEFAULT 'default', + included_quota_bytes BIGINT NOT NULL DEFAULT 0, + base_price_per_byte DOUBLE PRECISION NOT NULL DEFAULT 0, + region_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + line_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + peak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + offpeak_multiplier DOUBLE PRECISION NOT NULL DEFAULT 1.0, + pricing_rule_version TEXT NOT NULL DEFAULT 'pricing-default-v1', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +);