From 3bd1797110be77ed9c04910d8b177e00d69341fa Mon Sep 17 00:00:00 2001 From: LZG3530606141 <3530606141@qq.com> Date: Mon, 11 May 2026 19:31:00 +0800 Subject: [PATCH] feat: add Confluent Cloud billing plugin - Implement ConfluentCostSource for ingesting Confluent Cloud billing data - Fetch costs via Confluent Billing v1 API with pagination support - Support Basic Auth with API key/secret pair - Map Confluent cost fields to FOCUS spec including product, line_type, resource display name, environment, network access type, granularity - Include rate limiting and proper error handling - Add unit tests for JSON unmarshalling - Add config validator - Follow OpenCost plugin architecture (hashicorp go-plugin) References: - Confluent Billing API: https://docs.confluent.io/cloud/current/api.html#tag/Costs-(billingv1) - Issue: #47 Signed-off-by: LZG3530606141 <3530606141@qq.com> --- pkg/plugins/confluent/cmd/main/main.go | 241 ++++++++++++++++++ pkg/plugins/confluent/cmd/main/main_test.go | 134 ++++++++++ .../confluent/cmd/validator/main/main.go | 41 +++ .../confluent/config/confluentconfig.go | 31 +++ .../confluentplugin/confluentbilling.go | 50 ++++ pkg/plugins/confluent/go.mod | 12 + 6 files changed, 509 insertions(+) create mode 100644 pkg/plugins/confluent/cmd/main/main.go create mode 100644 pkg/plugins/confluent/cmd/main/main_test.go create mode 100644 pkg/plugins/confluent/cmd/validator/main/main.go create mode 100644 pkg/plugins/confluent/config/confluentconfig.go create mode 100644 pkg/plugins/confluent/confluentplugin/confluentbilling.go create mode 100644 pkg/plugins/confluent/go.mod diff --git a/pkg/plugins/confluent/cmd/main/main.go b/pkg/plugins/confluent/cmd/main/main.go new file mode 100644 index 0000000..5b2c0c4 --- /dev/null +++ b/pkg/plugins/confluent/cmd/main/main.go @@ -0,0 +1,241 @@ +package main + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/hashicorp/go-plugin" + "golang.org/x/time/rate" + "google.golang.org/protobuf/types/known/timestamppb" + + commonconfig "github.com/opencost/opencost-plugins/pkg/common/config" + confluentconfig "github.com/opencost/opencost-plugins/pkg/plugins/confluent/config" + confluentplugin "github.com/opencost/opencost-plugins/pkg/plugins/confluent/confluentplugin" + "github.com/opencost/opencost/core/pkg/log" + "github.com/opencost/opencost/core/pkg/model/pb" + "github.com/opencost/opencost/core/pkg/opencost" + ocplugin "github.com/opencost/opencost/core/pkg/plugin" +) + +// handshakeConfigs are used to just do a basic handshake between +// a plugin and host. If the handshake fails, a user friendly error is shown. +var handshakeConfig = plugin.HandshakeConfig{ + ProtocolVersion: 1, + MagicCookieKey: "PLUGIN_NAME", + MagicCookieValue: "confluent", +} + +const confluentCostsURL = "https://api.confluent.cloud/billing/v1/costs" +const confluentAPIDateFormat = "2006-01-02" + +// Implementation of CustomCostSource +type ConfluentCostSource struct { + rateLimiter *rate.Limiter + config *confluentconfig.ConfluentConfig + client HTTPClient +} + +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +func (c *ConfluentCostSource) GetCustomCosts(req *pb.CustomCostRequest) []*pb.CustomCostResponse { + results := []*pb.CustomCostResponse{} + + targets, err := opencost.GetWindows(req.Start.AsTime(), req.End.AsTime(), req.Resolution.AsDuration()) + if err != nil { + log.Errorf("error getting windows: %v", err) + errResp := pb.CustomCostResponse{ + Errors: []string{fmt.Sprintf("error getting windows: %v", err)}, + } + results = append(results, &errResp) + return results + } + + for _, target := range targets { + // don't allow future requests + if target.Start().After(time.Now().UTC()) { + log.Debugf("skipping future window %v", target) + continue + } + + log.Debugf("fetching Confluent costs for window %v", target) + result := c.getConfluentCostsForWindow(target) + results = append(results, result) + } + + return results +} + +func main() { + configFile, err := commonconfig.GetConfigFilePath() + if err != nil { + log.Fatalf("error opening config file: %v", err) + } + + confluentCfg, err := confluentconfig.GetConfluentConfig(configFile) + if err != nil { + log.Fatalf("error building Confluent config: %v", err) + } + log.SetLogLevel(confluentCfg.LogLevel) + + // Confluent Cloud API rate limit: approximately 100 requests per minute + rateLimiter := rate.NewLimiter(1.5, 2) + confluentCostSrc := ConfluentCostSource{ + rateLimiter: rateLimiter, + config: confluentCfg, + client: &http.Client{}, + } + + // pluginMap is the map of plugins we can dispense. + var pluginMap = map[string]plugin.Plugin{ + "CustomCostSource": &ocplugin.CustomCostPlugin{Impl: &confluentCostSrc}, + } + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: handshakeConfig, + Plugins: pluginMap, + GRPCServer: plugin.DefaultGRPCServer, + }) +} + +func boilerplateConfluentCustomCost(win opencost.Window) pb.CustomCostResponse { + return pb.CustomCostResponse{ + Metadata: map[string]string{"api_client_version": "v1"}, + CostSource: "data_streaming", + Domain: "confluent", + Version: "v1", + Currency: "USD", + Start: timestamppb.New(*win.Start()), + End: timestamppb.New(*win.End()), + Errors: []string{}, + Costs: []*pb.CustomCost{}, + } +} + +func (c *ConfluentCostSource) getConfluentCostsForWindow(window opencost.Window) *pb.CustomCostResponse { + ccResp := boilerplateConfluentCustomCost(window) + + winStart := window.Start().Format(confluentAPIDateFormat) + winEnd := window.End().Format(confluentAPIDateFormat) + + // Fetch costs for the window period + costs, err := c.getCosts(winStart, winEnd) + if err != nil { + ccResp.Errors = append(ccResp.Errors, fmt.Sprintf("error getting Confluent costs: %v", err)) + return &ccResp + } + + customCosts := []*pb.CustomCost{} + for _, cost := range costs { + resourceDisplayName := cost.Resource.DisplayName + resourceID := cost.Resource.ID + environmentID := cost.Resource.Environment.ID + + customCost := pb.CustomCost{ + AccountName: environmentID, + ChargeCategory: "Usage", + Description: fmt.Sprintf("Confluent %s usage", cost.Product), + ResourceName: cost.Product, + ResourceType: cost.LineType, + Id: uuid.New().String(), + ProviderId: fmt.Sprintf("%s/%s/%s", resourceID, cost.Product, cost.LineType), + BilledCost: float32(cost.Amount), + ListCost: float32(cost.OriginalAmount), + ListUnitPrice: float32(cost.Price), + UsageQuantity: float32(cost.Quantity), + UsageUnit: cost.Unit, + Labels: map[string]string{}, + } + + if resourceDisplayName != "" { + customCost.Labels["resource_display_name"] = resourceDisplayName + } + if environmentID != "" { + customCost.Labels["environment_id"] = environmentID + } + if cost.NetworkAccessType != "" { + customCost.Labels["network_access_type"] = cost.NetworkAccessType + } + if cost.Granularity != "" { + customCost.Labels["granularity"] = cost.Granularity + } + if cost.DiscountAmount > 0 { + customCost.Labels["discount_amount"] = fmt.Sprintf("%.6f", cost.DiscountAmount) + } + if cost.ID != "" { + customCost.Labels["cost_id"] = cost.ID + } + + customCosts = append(customCosts, &customCost) + } + + ccResp.Costs = customCosts + return &ccResp +} + +func (c *ConfluentCostSource) getCosts(startDate, endDate string) ([]confluentplugin.ConfluentCost, error) { + var allCosts []confluentplugin.ConfluentCost + + // Build Basic Auth header from API key and secret + credentials := base64.StdEncoding.EncodeToString( + []byte(fmt.Sprintf("%s:%s", c.config.APIKey, c.config.APISecret)), + ) + + url := fmt.Sprintf("%s?start_date=%s&end_date=%s&page_size=100", confluentCostsURL, startDate, endDate) + + for { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %v", err) + } + req.Header.Set("Authorization", fmt.Sprintf("Basic %s", credentials)) + req.Header.Set("Accept", "application/json") + + err = c.rateLimiter.Wait(context.Background()) + if err != nil { + return nil, fmt.Errorf("rate limiter error: %v", err) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("error making request to Confluent API: %v", err) + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("Confluent API returned status %d: %s", resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + var costList confluentplugin.ConfluentCostList + err = json.Unmarshal(body, &costList) + if err != nil { + return nil, fmt.Errorf("error unmarshalling Confluent response: %v", err) + } + + allCosts = append(allCosts, costList.Data...) + + // Check for pagination + if costList.Metadata.NextPageToken != "" { + url = fmt.Sprintf("%s?start_date=%s&end_date=%s&page_size=100&page_token=%s", + confluentCostsURL, startDate, endDate, costList.Metadata.NextPageToken) + } else { + break + } + } + + return allCosts, nil +} diff --git a/pkg/plugins/confluent/cmd/main/main_test.go b/pkg/plugins/confluent/cmd/main/main_test.go new file mode 100644 index 0000000..a0946ce --- /dev/null +++ b/pkg/plugins/confluent/cmd/main/main_test.go @@ -0,0 +1,134 @@ +package main + +import ( + "encoding/json" + "testing" + + confluentplugin "github.com/opencost/opencost-plugins/pkg/plugins/confluent/confluentplugin" +) + +func TestConfluentCostUnmarshal(t *testing.T) { + jsonData := `{ + "api_version": "billing/v1", + "kind": "Cost", + "id": "abc-123", + "start_date": "2024-01-01", + "end_date": "2024-01-31", + "granularity": "MONTHLY", + "line_type": "CHARGE", + "product": "Kafka", + "network_access_type": "PRIVATE", + "quantity": 100.5, + "unit": "GB", + "price": 0.11, + "amount": 11.055, + "original_amount": 13.0, + "discount_amount": 1.945, + "resource": { + "id": "lkc-abc123", + "display_name": "my-kafka-cluster", + "environment": { + "id": "env-xyz789" + } + } + }` + + var cost confluentplugin.ConfluentCost + err := json.Unmarshal([]byte(jsonData), &cost) + if err != nil { + t.Fatalf("Error unmarshalling cost: %v", err) + } + + if cost.Product != "Kafka" { + t.Errorf("Expected product 'Kafka', got '%s'", cost.Product) + } + if cost.Amount != 11.055 { + t.Errorf("Expected amount 11.055, got %f", cost.Amount) + } + if cost.Quantity != 100.5 { + t.Errorf("Expected quantity 100.5, got %f", cost.Quantity) + } + if cost.Resource.DisplayName != "my-kafka-cluster" { + t.Errorf("Expected resource display name 'my-kafka-cluster', got '%s'", cost.Resource.DisplayName) + } + if cost.Resource.Environment.ID != "env-xyz789" { + t.Errorf("Expected environment ID 'env-xyz789', got '%s'", cost.Resource.Environment.ID) + } + if cost.NetworkAccessType != "PRIVATE" { + t.Errorf("Expected network access type 'PRIVATE', got '%s'", cost.NetworkAccessType) + } + if cost.DiscountAmount != 1.945 { + t.Errorf("Expected discount amount 1.945, got %f", cost.DiscountAmount) + } +} + +func TestConfluentCostListUnmarshal(t *testing.T) { + jsonData := `{ + "api_version": "billing/v1", + "kind": "CostList", + "metadata": { + "total_size": 2, + "page_size": 100, + "page_token": "token-abc", + "next_page_token": "" + }, + "data": [ + { + "api_version": "billing/v1", + "kind": "Cost", + "id": "cost-1", + "start_date": "2024-01-01", + "end_date": "2024-01-31", + "product": "Kafka", + "quantity": 100.0, + "unit": "GB", + "price": 0.11, + "amount": 11.0, + "resource": { + "id": "lkc-abc123", + "display_name": "my-kafka-cluster", + "environment": {"id": "env-xyz789"} + } + }, + { + "api_version": "billing/v1", + "kind": "Cost", + "id": "cost-2", + "start_date": "2024-01-01", + "end_date": "2024-01-31", + "product": "Schema Registry", + "quantity": 50.0, + "unit": "GB", + "price": 0.05, + "amount": 2.5, + "resource": { + "id": "lsrc-def456", + "display_name": "my-schema-registry", + "environment": {"id": "env-xyz789"} + } + } + ] + }` + + var costList confluentplugin.ConfluentCostList + err := json.Unmarshal([]byte(jsonData), &costList) + if err != nil { + t.Fatalf("Error unmarshalling cost list: %v", err) + } + + if len(costList.Data) != 2 { + t.Errorf("Expected 2 cost entries, got %d", len(costList.Data)) + } + + if costList.Data[0].Product != "Kafka" { + t.Errorf("Expected first product 'Kafka', got '%s'", costList.Data[0].Product) + } + + if costList.Data[1].Product != "Schema Registry" { + t.Errorf("Expected second product 'Schema Registry', got '%s'", costList.Data[1].Product) + } + + if costList.Metadata.TotalSize != 2 { + t.Errorf("Expected total_size 2, got %d", costList.Metadata.TotalSize) + } +} diff --git a/pkg/plugins/confluent/cmd/validator/main/main.go b/pkg/plugins/confluent/cmd/validator/main/main.go new file mode 100644 index 0000000..7fca49b --- /dev/null +++ b/pkg/plugins/confluent/cmd/validator/main/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "os" + + commonconfig "github.com/opencost/opencost-plugins/pkg/common/config" + confluentconfig "github.com/opencost/opencost-plugins/pkg/plugins/confluent/config" + "github.com/opencost/opencost/core/pkg/log" +) + +func main() { + configFile, err := commonconfig.GetConfigFilePath() + if err != nil { + log.Fatalf("error opening config file: %v", err) + } + + confluentCfg, err := confluentconfig.GetConfluentConfig(configFile) + if err != nil { + log.Fatalf("error building Confluent config: %v", err) + } + + // Validate required fields + if confluentCfg.APIKey == "" { + log.Fatalf("confluent_api_key is required in config file") + } + if confluentCfg.APISecret == "" { + log.Fatalf("confluent_api_secret is required in config file") + } + + fmt.Printf("Confluent config validated successfully\n") + fmt.Printf("Log level: %s\n", confluentCfg.LogLevel) + fmt.Printf("\nSample config:\n") + fmt.Printf(`{ + "confluent_api_key": "YOUR_API_KEY", + "confluent_api_secret": "YOUR_API_SECRET", + "confluent_plugin_log_level": "info" +} +`) + os.Exit(0) +} diff --git a/pkg/plugins/confluent/config/confluentconfig.go b/pkg/plugins/confluent/config/confluentconfig.go new file mode 100644 index 0000000..df9d7b2 --- /dev/null +++ b/pkg/plugins/confluent/config/confluentconfig.go @@ -0,0 +1,31 @@ +package config + +import ( + "encoding/json" + "fmt" + "os" +) + +type ConfluentConfig struct { + APIKey string `json:"confluent_api_key"` + APISecret string `json:"confluent_api_secret"` + LogLevel string `json:"confluent_plugin_log_level"` +} + +func GetConfluentConfig(configFilePath string) (*ConfluentConfig, error) { + var result ConfluentConfig + bytes, err := os.ReadFile(configFilePath) + if err != nil { + return nil, fmt.Errorf("error reading config file for Confluent config @ %s: %v", configFilePath, err) + } + err = json.Unmarshal(bytes, &result) + if err != nil { + return nil, fmt.Errorf("error marshaling json into Confluent config %v", err) + } + + if result.LogLevel == "" { + result.LogLevel = "info" + } + + return &result, nil +} diff --git a/pkg/plugins/confluent/confluentplugin/confluentbilling.go b/pkg/plugins/confluent/confluentplugin/confluentbilling.go new file mode 100644 index 0000000..12eadcf --- /dev/null +++ b/pkg/plugins/confluent/confluentplugin/confluentbilling.go @@ -0,0 +1,50 @@ +package confluentplugin + +// ConfluentCost represents a single cost entry from the Confluent Billing v1 API. +// Based on the BillingV1Cost schema from the official Confluent Cloud API. +type ConfluentCost struct { + APIVersion string `json:"api_version,omitempty"` + Kind string `json:"kind,omitempty"` + ID string `json:"id,omitempty"` + StartDate string `json:"start_date,omitempty"` + EndDate string `json:"end_date,omitempty"` + Granularity string `json:"granularity,omitempty"` + LineType string `json:"line_type,omitempty"` + Product string `json:"product,omitempty"` + NetworkAccessType string `json:"network_access_type,omitempty"` + Quantity float64 `json:"quantity,omitempty"` + Unit string `json:"unit,omitempty"` + Price float64 `json:"price,omitempty"` + Amount float64 `json:"amount,omitempty"` + OriginalAmount float64 `json:"original_amount,omitempty"` + DiscountAmount float64 `json:"discount_amount,omitempty"` + Resource ConfluentResource `json:"resource,omitempty"` +} + +// ConfluentResource represents the resource associated with a cost entry. +type ConfluentResource struct { + ID string `json:"id,omitempty"` + DisplayName string `json:"display_name,omitempty"` + Environment ConfluentEnvironment `json:"environment,omitempty"` +} + +// ConfluentEnvironment represents the environment associated with a resource. +type ConfluentEnvironment struct { + ID string `json:"id,omitempty"` +} + +// ConfluentCostList represents the paginated list response from the Confluent Billing API. +type ConfluentCostList struct { + APIVersion string `json:"api_version,omitempty"` + Kind string `json:"kind,omitempty"` + Metadata ConfluentListMeta `json:"metadata,omitempty"` + Data []ConfluentCost `json:"data,omitempty"` +} + +// ConfluentListMeta represents pagination metadata in list responses. +type ConfluentListMeta struct { + TotalSize int `json:"total_size,omitempty"` + PageSize int `json:"page_size,omitempty"` + PageToken string `json:"page_token,omitempty"` + NextPageToken string `json:"next_page_token,omitempty"` +} diff --git a/pkg/plugins/confluent/go.mod b/pkg/plugins/confluent/go.mod new file mode 100644 index 0000000..cdf20a8 --- /dev/null +++ b/pkg/plugins/confluent/go.mod @@ -0,0 +1,12 @@ +module github.com/opencost/opencost-plugins/pkg/plugins/confluent + +go 1.21 + +require ( + github.com/google/uuid v1.6.0 + github.com/hashicorp/go-plugin v1.6.1 + github.com/opencost/opencost-plugins/pkg/common v0.0.0 + github.com/opencost/opencost/core v0.0.0 + golang.org/x/time v0.5.0 + google.golang.org/protobuf v1.34.0 +)