artifacts/prometheus_to_cloudwatch.go
Sergei Ivanov 315e75077d Sanitise values before sending to CloudWatch (#17)
Throw away data points with known invalid values
to avoid the whole batch being rejected by CloudWatch API.

Signed-off-by: Sergei Ivanov <sergei_ivanov@mail.ru>
2019-06-24 22:34:34 -04:00

469 lines
14 KiB
Go

package main
import (
"context"
"crypto/tls"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/gobwas/glob"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"io"
"log"
"math"
"mime"
"net/http"
"sort"
"time"
)
const (
batchSize = 10
cwHighResLabel = "__cw_high_res"
cwUnitLabel = "__cw_unit"
acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`
)
// Config defines configuration options
type Config struct {
// AWS access key Id with permissions to publish CloudWatch metrics
AwsAccessKeyId string
// AWS secret access key with permissions to publish CloudWatch metrics
AwsSecretAccessKey string
// Required. The CloudWatch namespace under which metrics should be published
CloudWatchNamespace string
// Required. The AWS Region to use
CloudWatchRegion string
// The frequency with which metrics should be published to Cloudwatch. Default: 15s
CloudWatchPublishInterval time.Duration
// Timeout for sending metrics to Cloudwatch. Default: 3s
CloudWatchPublishTimeout time.Duration
// Prometheus scrape URL
PrometheusScrapeUrl string
// Path to Certificate file
PrometheusCertPath string
// Path to Key file
PrometheusKeyPath string
// Accept any certificate during TLS handshake. Insecure, use only for testing
PrometheusSkipServerCertCheck bool
// Additional dimensions to send to CloudWatch
AdditionalDimensions map[string]string
// Replace dimensions with the provided label. This allows for aggregating metrics across dimensions so we can set CloudWatch Alarms on the metrics
ReplaceDimensions map[string]string
// Only publish the specified metrics (a list of glob patterns, e.g. ["up", "http_*"])
IncludeMetrics []glob.Glob
// Never publish the specified metrics (a list of glob patterns, e.g. ["tomcat_*"])
ExcludeMetrics []glob.Glob
}
// Bridge pushes metrics to AWS CloudWatch
type Bridge struct {
cloudWatchPublishInterval time.Duration
cloudWatchNamespace string
cw *cloudwatch.CloudWatch
prometheusScrapeUrl string
prometheusCertPath string
prometheusKeyPath string
prometheusSkipServerCertCheck bool
additionalDimensions map[string]string
replaceDimensions map[string]string
includeMetrics []glob.Glob
excludeMetrics []glob.Glob
}
// NewBridge initializes and returns a pointer to a Bridge using the
// supplied configuration, or an error if there is a problem with the configuration
func NewBridge(c *Config) (*Bridge, error) {
b := &Bridge{}
if c.CloudWatchNamespace == "" {
return nil, errors.New("CloudWatchNamespace required")
}
b.cloudWatchNamespace = c.CloudWatchNamespace
if c.PrometheusScrapeUrl == "" {
return nil, errors.New("PrometheusScrapeUrl required")
}
b.prometheusScrapeUrl = c.PrometheusScrapeUrl
b.prometheusCertPath = c.PrometheusCertPath
b.prometheusKeyPath = c.PrometheusKeyPath
b.prometheusSkipServerCertCheck = c.PrometheusSkipServerCertCheck
b.additionalDimensions = c.AdditionalDimensions
b.replaceDimensions = c.ReplaceDimensions
b.includeMetrics = c.IncludeMetrics
b.excludeMetrics = c.ExcludeMetrics
if c.CloudWatchPublishInterval > 0 {
b.cloudWatchPublishInterval = c.CloudWatchPublishInterval
} else {
b.cloudWatchPublishInterval = 30 * time.Second
}
var client = http.DefaultClient
if c.CloudWatchPublishTimeout > 0 {
client.Timeout = c.CloudWatchPublishTimeout
} else {
client.Timeout = 5 * time.Second
}
if c.CloudWatchRegion == "" {
return nil, errors.New("CloudWatchRegion required")
}
config := aws.NewConfig().WithHTTPClient(client).WithRegion(c.CloudWatchRegion)
// https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html
// https://docs.aws.amazon.com/sdk-for-go/api/aws/#Config
// If credentials are not provided in the variables, the chain of credential providers will search for credentials
// in environment variables, the shared credential file, and EC2 Instance Roles
if c.AwsAccessKeyId != "" && c.AwsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(c.AwsAccessKeyId, c.AwsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
if err != nil {
return nil, err
}
b.cw = cloudwatch.New(sess)
return b, nil
}
// Run starts a loop that will push metrics to Cloudwatch at the configured interval. Accepts a context.Context to support cancellation
func (b *Bridge) Run(ctx context.Context) {
ticker := time.NewTicker(b.cloudWatchPublishInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
mfChan := make(chan *dto.MetricFamily, 1024)
go fetchMetricFamilies(b.prometheusScrapeUrl, mfChan, b.prometheusCertPath, b.prometheusKeyPath, b.prometheusSkipServerCertCheck)
var metricFamilies []*dto.MetricFamily
for mf := range mfChan {
metricFamilies = append(metricFamilies, mf)
}
count, err := b.publishMetricsToCloudWatch(metricFamilies)
if err != nil {
log.Println("prometheus-to-cloudwatch: error publishing to CloudWatch:", err)
} else {
log.Println(fmt.Sprintf("prometheus-to-cloudwatch: published %d metrics to CloudWatch", count))
}
case <-ctx.Done():
log.Println("prometheus-to-cloudwatch: stopping")
return
}
}
}
// NOTE: The CloudWatch API has the following limitations:
// - Max 40kb request size
// - Single namespace per request
// - Max 10 dimensions per metric
func (b *Bridge) publishMetricsToCloudWatch(mfs []*dto.MetricFamily) (count int, e error) {
vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{Timestamp: model.Now()}, mfs...)
if err != nil {
return 0, err
}
data := make([]*cloudwatch.MetricDatum, 0, batchSize)
for _, s := range vec {
name := getName(s.Metric)
if b.shouldIgnoreMetric(name) {
continue
}
data = appendDatum(data, name, s, b)
if len(data) == batchSize {
count += batchSize
if err := b.flush(data); err != nil {
log.Println("prometheus-to-cloudwatch: error publishing to CloudWatch:", err)
}
data = make([]*cloudwatch.MetricDatum, 0, batchSize)
}
}
count += len(data)
return count, b.flush(data)
}
func (b *Bridge) flush(data []*cloudwatch.MetricDatum) error {
if len(data) > 0 {
in := &cloudwatch.PutMetricDataInput{
MetricData: data,
Namespace: &b.cloudWatchNamespace,
}
_, err := b.cw.PutMetricData(in)
return err
}
return nil
}
func (b *Bridge) shouldIgnoreMetric(metricName string) bool {
// Blacklist takes priority over the whitelist
if anyPatternMatches(b.excludeMetrics, metricName) {
return true
} else if len(b.includeMetrics) == 0 {
return false
} else if anyPatternMatches(b.includeMetrics, metricName) {
return false
}
return true
}
func anyPatternMatches(patterns []glob.Glob, s string) bool {
for _, pattern := range patterns {
if pattern.Match(s) {
return true
}
}
return false
}
func appendDatum(data []*cloudwatch.MetricDatum, name string, s *model.Sample, b *Bridge) []*cloudwatch.MetricDatum {
metric := s.Metric
if len(metric) == 0 {
return data
}
// Check value before adding the datum
value := float64(s.Value)
if !validValue(value) {
return data
}
datum := &cloudwatch.MetricDatum{}
kubeStateDimensions, replacedDimensions := getDimensions(metric, 10-len(b.additionalDimensions), b)
datum.SetMetricName(name).
SetValue(value).
SetTimestamp(s.Timestamp.Time()).
SetDimensions(append(kubeStateDimensions, getAdditionalDimensions(b)...)).
SetStorageResolution(getResolution(metric)).
SetUnit(getUnit(metric))
data = append(data, datum)
// Don't add replacement if not configured
if replacedDimensions != nil && len(replacedDimensions) > 0 {
replacedDimensionDatum := &cloudwatch.MetricDatum{}
replacedDimensionDatum.SetMetricName(name).
SetValue(value).
SetTimestamp(s.Timestamp.Time()).
SetDimensions(append(replacedDimensions, getAdditionalDimensions(b)...)).
SetStorageResolution(getResolution(metric)).
SetUnit(getUnit(metric))
data = append(data, replacedDimensionDatum)
}
return data
}
var (
valueTooSmall = math.Pow(2, -260)
valueTooLarge = math.Pow(2, 260)
)
// According to the documentation:
// "CloudWatch rejects values that are either too small or too large.
// Values must be in the range of 8.515920e-109 to 1.174271e+108 (Base 10)
// or 2e-360 to 2e360 (Base 2).
// In addition, special values (for example, NaN, +Infinity, -Infinity) are not supported."
func validValue(v float64) bool {
if math.IsInf(v, 0) {
return false
}
if math.IsNaN(v) {
return false
}
// Check for zero first to avoid tripping on "value too small"
if v == 0.0 {
return true
}
// Check that a non-zero value is within the range of accepted values
a := math.Abs(v)
if a <= valueTooSmall || a >= valueTooLarge {
return false
}
return true
}
func getName(m model.Metric) string {
if n, ok := m[model.MetricNameLabel]; ok {
return string(n)
}
return ""
}
// getDimensions returns up to 10 dimensions for the provided metric - one for each label (except the __name__ label)
// If a metric has more than 10 labels, it attempts to behave deterministically and returning the first 10 labels as dimensions
func getDimensions(m model.Metric, num int, b *Bridge) ([]*cloudwatch.Dimension, []*cloudwatch.Dimension) {
if len(m) == 0 {
return make([]*cloudwatch.Dimension, 0), nil
} else if _, ok := m[model.MetricNameLabel]; len(m) == 1 && ok {
return make([]*cloudwatch.Dimension, 0), nil
}
names := make([]string, 0, len(m))
for k := range m {
if !(k == model.MetricNameLabel || k == cwHighResLabel || k == cwUnitLabel) {
names = append(names, string(k))
}
}
sort.Strings(names)
dims := make([]*cloudwatch.Dimension, 0, len(names))
replacedDims := make([]*cloudwatch.Dimension, 0, len(names))
for _, name := range names {
if name != "" {
val := string(m[model.LabelName(name)])
if val != "" {
dims = append(dims, new(cloudwatch.Dimension).SetName(name).SetValue(val))
// Don't add replacement if not configured
if b.replaceDimensions != nil && len(b.replaceDimensions) > 0 {
if replacement, ok := b.replaceDimensions[name]; ok {
replacedDims = append(replacedDims, new(cloudwatch.Dimension).SetName(name).SetValue(replacement))
} else {
replacedDims = append(replacedDims, new(cloudwatch.Dimension).SetName(name).SetValue(val))
}
}
}
}
}
if len(dims) > num {
dims = dims[:num]
}
if len(replacedDims) > num {
replacedDims = replacedDims[:num]
}
return dims, replacedDims
}
func getAdditionalDimensions(b *Bridge) []*cloudwatch.Dimension {
dims := make([]*cloudwatch.Dimension, 0, len(b.additionalDimensions))
for k, v := range b.additionalDimensions {
dims = append(dims, new(cloudwatch.Dimension).SetName(k).SetValue(v))
}
return dims
}
// Returns 1 if the metric contains a __cw_high_res label, otherwise returns 60
func getResolution(m model.Metric) int64 {
if _, ok := m[cwHighResLabel]; ok {
return 1
}
return 60
}
func getUnit(m model.Metric) string {
if u, ok := m[cwUnitLabel]; ok {
return string(u)
}
return "None"
}
// fetchMetricFamilies retrieves metrics from the provided URL, decodes them into MetricFamily proto messages, and sends them to the provided channel.
// It returns after all MetricFamilies have been sent
func fetchMetricFamilies(
url string, ch chan<- *dto.MetricFamily,
certificate string, key string,
skipServerCertCheck bool,
) {
defer close(ch)
var transport *http.Transport
if certificate != "" && key != "" {
cert, err := tls.LoadX509KeyPair(certificate, key)
if err != nil {
log.Fatal("prometheus-to-cloudwatch: Error: ", err)
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: skipServerCertCheck,
}
tlsConfig.BuildNameToCertificate()
transport = &http.Transport{TLSClientConfig: tlsConfig}
} else {
transport = &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: skipServerCertCheck},
}
}
client := &http.Client{Transport: transport}
decodeContent(client, url, ch)
}
func decodeContent(client *http.Client, url string, ch chan<- *dto.MetricFamily) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("prometheus-to-cloudwatch: Error: creating GET request for URL %q failed: %s", url, err)
}
req.Header.Add("Accept", acceptHeader)
resp, err := client.Do(req)
if err != nil {
log.Fatalf("prometheus-to-cloudwatch: Error: executing GET request for URL %q failed: %s", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Fatalf("prometheus-to-cloudwatch: Error: GET request for URL %q returned HTTP status %s", url, resp.Status)
}
parseResponse(resp, ch)
}
// parseResponse consumes an http.Response and pushes it to the channel.
// It returns when all all MetricFamilies are parsed and put on the channel.
func parseResponse(resp *http.Response, ch chan<- *dto.MetricFamily) {
mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err == nil && mediaType == "application/vnd.google.protobuf" && params["encoding"] == "delimited" && params["proto"] == "io.prometheus.client.MetricFamily" {
for {
mf := &dto.MetricFamily{}
if _, err = pbutil.ReadDelimited(resp.Body, mf); err != nil {
if err == io.EOF {
break
}
log.Fatalln("prometheus-to-cloudwatch: Error: reading metric family protocol buffer failed:", err)
}
ch <- mf
}
} else {
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
log.Fatalln("reading text format failed:", err)
}
for _, mf := range metricFamilies {
ch <- mf
}
}
}