Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ require (
github.com/aws/aws-lambda-go v1.54.0
github.com/aws/aws-sdk-go-v2 v1.41.7
github.com/aws/aws-sdk-go-v2/config v1.32.17
github.com/aws/aws-sdk-go-v2/credentials v1.19.16
github.com/aws/aws-sdk-go-v2/service/s3 v1.100.1
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.7
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.6
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1
github.com/aws/smithy-go v1.25.1
github.com/go-kit/log v0.2.1
github.com/gogo/protobuf v1.3.2
Expand Down Expand Up @@ -40,7 +42,6 @@ require (
github.com/armon/go-metrics v0.4.1 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.16 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect
Expand All @@ -52,7 +53,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 // indirect
Expand Down
54 changes: 52 additions & 2 deletions lambda-promtail.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@
Type: String
Default: ""
NoEcho: true
TenantID:
Description: The Loki tenant ID, sent as the X-Scope-OrgID header. Required when WifAudience is set.
Type: String
Default: ""
WifAudience:
Description: >
The audience of the AWS STS web identity token used for workload identity federation (WIF) with
Grafana Cloud. When set, a short-lived JWT is fetched from AWS STS and sent as
'Authorization: Bearer <TenantID>:<JWT>'. Mutually exclusive with Username/Password.
Type: String
Default: ""
WifRoleArn:
Description: >
Optional IAM role ARN to assume before requesting the STS web identity token. Leave empty to use
the Lambda execution role directly. Only used when WifAudience is set.
Type: String
Default: ""
KeepStream:
Description: Determines whether to keep the CloudWatch Log Stream value as a Loki label when writing logs from lambda-promtail.
Type: String
Expand All @@ -39,7 +56,7 @@
LogGroupToSubscribe:
Description: Name of the CloudWatch Log Group to subscribe the lambda to. The logs written to it, will be forwarded to Grafana Cloud Loki (ex. /aws/lambda/my-log-group).
Type: String
Default: ""

Check warning on line 59 in lambda-promtail.yaml

View workflow job for this annotation

GitHub Actions / lint

W1030 {'Ref': 'LogGroupToSubscribe'} does not match '^[\\.\\-_/#A-Za-z0-9]+$' when 'Ref' is resolved at 'Resources/MainLambdaPromtailSubscriptionFilter/Properties/LogGroupName'

Metadata:
AWS::CloudFormation::Interface:
Expand All @@ -50,12 +67,15 @@
- WriteAddress
- Username
- Password
- TenantID
- WifAudience
- WifRoleArn
- Label:
default: "Lambda function configuration"
Parameters:
- S3BucketName
- S3KeyName
- SubscriptionFilter

Check warning on line 78 in lambda-promtail.yaml

View workflow job for this annotation

GitHub Actions / lint

W4001 'SubscriptionFilter' is not one of ['WriteAddress', 'ReservedConcurrency', 'Username', 'Password', 'TenantID', 'WifAudience', 'WifRoleArn', 'KeepStream', 'ExtraLabels', 'S3BucketName', 'S3KeyName', 'LogGroupToSubscribe']
- ReservedConcurrency
- Label:
default: "Additional configuration"
Expand All @@ -63,6 +83,10 @@
- KeepStream
- ExtraLabels

Conditions:
UseWif: !Not [!Equals [!Ref WifAudience, ""]]
HasWifRole: !Not [!Equals [!Ref WifRoleArn, ""]]

Resources:

LambdaPromtailRole:
Expand All @@ -89,14 +113,37 @@
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
RoleName: GrafanaLabsCloudWatchLogsIntegration
RoleName: !Sub "GrafanaLabsCloudWatchLogsIntegration-${AWS::StackName}"

# Grants the STS permissions needed for workload identity federation. Only created when
# WifAudience is set. When WifRoleArn is provided the Lambda role is allowed to assume it;
# otherwise the Lambda role fetches the web identity token directly.
LambdaPromtailWifPolicy:
Type: AWS::IAM::Policy
Condition: UseWif
Properties:
PolicyName: wif-sts
Roles:
- !Ref LambdaPromtailRole
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: sts:GetWebIdentityToken
Resource: "*"
- !If
- HasWifRole
- Effect: Allow
Action: sts:AssumeRole
Resource: !Ref WifRoleArn
- !Ref AWS::NoValue

LambdaPromtailFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: GrafanaCloudLambdaPromtail
FunctionName: !Sub "GrafanaCloudLambdaPromtail-${AWS::StackName}"
Code:
S3Bucket: !Ref S3BucketName

Check warning on line 146 in lambda-promtail.yaml

View workflow job for this annotation

GitHub Actions / lint

W1030 {'Ref': 'S3BucketName'} is shorter than 3 when 'Ref' is resolved

Check warning on line 146 in lambda-promtail.yaml

View workflow job for this annotation

GitHub Actions / lint

W1030 {'Ref': 'S3BucketName'} is not a 'AWS::S3::Bucket.Name' with pattern '^(?![.\\-])(?!.*\\.\\.)(?!.*\\-\\.)(?!.*\\.\\-)[a-z0-9.\\-]{3,63}(?<![.\\-])$' when 'Ref' is resolved
S3Key: !Ref S3KeyName
Runtime: provided.al2023
Handler: main
Expand All @@ -109,6 +156,9 @@
WRITE_ADDRESS: !Ref WriteAddress
USERNAME: !Ref Username
PASSWORD: !Ref Password
TENANT_ID: !Ref TenantID
WIF_AUDIENCE: !Ref WifAudience
WIF_ROLE_ARN: !Ref WifRoleArn
KEEP_STREAM: !Ref KeepStream
EXTRA_LABELS: !Ref ExtraLabels

Expand Down
105 changes: 105 additions & 0 deletions pkg/auth_sts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/sts"
)

const (
// stsSigningAlg is the signing algorithm requested from STS for the web identity token.
stsSigningAlg = "ES384"

// stsTokenRefreshMargin is how long before its expiry a cached token is refreshed.
stsTokenRefreshMargin = 1 * time.Minute
)

// stsWebIdentityTokenClient is the subset of the STS client used to fetch web identity tokens.
// It is an interface so the option can be unit tested without calling AWS.
type stsWebIdentityTokenClient interface {
GetWebIdentityToken(ctx context.Context, params *sts.GetWebIdentityTokenInput, optFns ...func(*sts.Options)) (*sts.GetWebIdentityTokenOutput, error)
}

// stsWebIdentityOption fetches a web identity JWT from AWS STS and sets it as a bearer token.
// The audience of the token is wifAudience (e.g.
// https://grafana-dev.com/v1/workload-identities/dev-eu-west-2:7161:alloy-ec2), and the header
// is set to:
//
// Authorization: Bearer <tenantID>:<JWT>
type stsWebIdentityOption struct {
client stsWebIdentityTokenClient
tenantID string
wifAudience string

mu sync.Mutex
cachedToken string
expiresAt time.Time
}

// newSTSWebIdentityOption builds an stsWebIdentityOption. If roleARN is non-empty the option
// assumes that role before requesting the token, mirroring the behaviour of Alloy's gcomawsauth.
func newSTSWebIdentityOption(ctx context.Context, tenantID, wifAudience, roleARN string) (*stsWebIdentityOption, error) {
awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}

if roleARN != "" {
creds := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(awsCfg), roleARN)
awsCfg.Credentials = aws.NewCredentialsCache(creds)
}

return &stsWebIdentityOption{
client: sts.NewFromConfig(awsCfg),
tenantID: tenantID,
wifAudience: wifAudience,
}, nil
}

func (o *stsWebIdentityOption) Apply(ctx context.Context, req *http.Request) error {
token, err := o.token(ctx)
if err != nil {
return err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s:%s", o.tenantID, token))
return nil
}

// token returns a cached web identity token, fetching a fresh one from STS when none is cached
// or the cached one is close to expiry.
func (o *stsWebIdentityOption) token(ctx context.Context) (string, error) {
o.mu.Lock()
defer o.mu.Unlock()

if o.cachedToken != "" && time.Now().Before(o.expiresAt.Add(-stsTokenRefreshMargin)) {
return o.cachedToken, nil
}

alg := stsSigningAlg
output, err := o.client.GetWebIdentityToken(ctx, &sts.GetWebIdentityTokenInput{
Audience: []string{o.wifAudience},
SigningAlgorithm: &alg,
})
if err != nil {
return "", fmt.Errorf("failed to get JWT from AWS STS: %w", err)
}
if output.WebIdentityToken == nil {
return "", fmt.Errorf("AWS STS returned an empty web identity token")
}

o.cachedToken = *output.WebIdentityToken
if output.Expiration != nil {
o.expiresAt = *output.Expiration
} else {
o.expiresAt = time.Time{}
}

return o.cachedToken, nil
}
65 changes: 49 additions & 16 deletions pkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ const (
)

var (
writeAddress *url.URL
username, password, extraLabelsRaw, dropLabelsRaw, tenantID, bearerToken string
keepStream bool
batchSize int
pipelineTimeout time.Duration
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
dropLabels []model.LabelName
skipTLSVerify bool
printLogLine bool
relabelConfigs []*relabel.Config
writeAddress *url.URL
extraLabelsRaw, dropLabelsRaw, tenantID string
authOptions []AuthOption
keepStream bool
batchSize int
pipelineTimeout time.Duration
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
dropLabels []model.LabelName
skipTLSVerify bool
printLogLine bool
relabelConfigs []*relabel.Config
)

func setupArguments(ctx context.Context, secretFetcher secretFetcher) {
Expand Down Expand Up @@ -72,11 +73,11 @@ func setupArguments(ctx context.Context, secretFetcher secretFetcher) {
panic(err)
}

username, err = loadSensitiveEnv(ctx, secretFetcher, "USERNAME")
username, err := loadSensitiveEnv(ctx, secretFetcher, "USERNAME")
if err != nil {
panic(err)
}
password, err = loadSensitiveEnv(ctx, secretFetcher, "PASSWORD")
password, err := loadSensitiveEnv(ctx, secretFetcher, "PASSWORD")
if err != nil {
panic(err)
}
Expand All @@ -85,7 +86,7 @@ func setupArguments(ctx context.Context, secretFetcher secretFetcher) {
panic("both username and password must be set if either one is set")
}

bearerToken, err = loadSensitiveEnv(ctx, secretFetcher, "BEARER_TOKEN")
bearerToken, err := loadSensitiveEnv(ctx, secretFetcher, "BEARER_TOKEN")
if err != nil {
panic(err)
}
Expand All @@ -94,14 +95,45 @@ func setupArguments(ctx context.Context, secretFetcher secretFetcher) {
panic("both username and bearerToken are not allowed")
}

tenantID = os.Getenv("TENANT_ID")

// Workload identity federation: when WIF_AUDIENCE is set we fetch a short-lived
// web identity JWT from AWS STS and send it as `Authorization: Bearer <tenantID>:<JWT>`.
wifAudience := os.Getenv("WIF_AUDIENCE")
wifRoleARN := os.Getenv("WIF_ROLE_ARN")
if wifAudience != "" {
if username != "" || bearerToken != "" {
panic("WIF_AUDIENCE cannot be combined with username/password or bearer token auth")
}
if tenantID == "" {
panic("TENANT_ID must be set when WIF_AUDIENCE is used")
}
}

authOptions = nil
if tenantID != "" {
authOptions = append(authOptions, tenantIDOption{tenantID: tenantID})
}
if username != "" && password != "" {
authOptions = append(authOptions, basicAuthOption{username: username, password: password})
}
if bearerToken != "" {
authOptions = append(authOptions, bearerTokenOption{token: bearerToken})
}
if wifAudience != "" {
stsOption, err := newSTSWebIdentityOption(ctx, tenantID, wifAudience, wifRoleARN)
if err != nil {
panic(err)
}
authOptions = append(authOptions, stsOption)
}

skipTLS := os.Getenv("SKIP_TLS_VERIFY")
// Anything other than case-insensitive 'true' is treated as 'false'.
if strings.EqualFold(skipTLS, "true") {
skipTLSVerify = true
}

tenantID = os.Getenv("TENANT_ID")

keep := os.Getenv("KEEP_STREAM")
// Anything other than case-insensitive 'true' is treated as 'false'.
if strings.EqualFold(keep, "true") {
Expand Down Expand Up @@ -279,6 +311,7 @@ func handler(ctx context.Context, ev map[string]interface{}) error {
timeout: timeout,
skipTLSVerify: skipTLSVerify,
},
auth: authOptions,
}, log)

lokiStageConfigs, err := ParsePipelineConfigs(os.Getenv("LOKI_STAGE_CONFIGS"), *log, metrics)
Expand Down
14 changes: 4 additions & 10 deletions pkg/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,10 @@ func (c *promtailClient) send(ctx context.Context, buf []byte) (int, error) {
req.Header.Set("Content-Type", contentType)
req.Header.Set("User-Agent", userAgent)

if tenantID != "" {
req.Header.Set("X-Scope-OrgID", tenantID)
}

if username != "" && password != "" {
req.SetBasicAuth(username, password)
}

if bearerToken != "" {
req.Header.Set("Authorization", "Bearer "+bearerToken)
for _, opt := range c.config.auth {
if err := opt.Apply(ctx, req); err != nil {
return -1, err
}
}

resp, err := c.http.Do(req.WithContext(ctx))
Expand Down
Loading
Loading