Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions bfe_balance/backend/bfe_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package backend

import (
"fmt"
"net"
"strconv"
"sync"
)

import (
"github.com/bfenetworks/bfe/bfe_config/bfe_cluster_conf/cluster_table_conf"
"github.com/bfenetworks/bfe/bfe_route/bfe_cluster"
)
Expand Down Expand Up @@ -54,6 +54,35 @@ func NewBfeBackend() *BfeBackend {
return backend
}

func (back *BfeBackend) InitSimpleByAddrinfo(subClusterName string, name string, addrinfo string) {
back.Name = name

// parse addrinfo to addr and port
host, portStr, err := net.SplitHostPort(addrinfo)
if err != nil {
back.Addr = addrinfo
back.Port = 0
} else {
back.Addr = host
if p, err2 := strconv.Atoi(portStr); err2 == nil {
back.Port = p
} else {
back.Port = 0
}
}
back.AddrInfo = addrinfo
back.SubCluster = subClusterName
}

func NewBfeBackendByAddrinfo(subClusterName string, name string, addrinfo string) *BfeBackend {
backend := new(BfeBackend)
backend.avail = true

backend.InitSimpleByAddrinfo(subClusterName, name, addrinfo)

return backend
}

// Init initializes BfeBackend with BackendConf
func (back *BfeBackend) Init(subCluster string, conf *cluster_table_conf.BackendConf) {
back.Name = *conf.Name
Expand Down
224 changes: 220 additions & 4 deletions bfe_balance/bal_gslb/bal_gslb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,27 @@ import (
"math/rand"
"net"
"sort"
"strings"
"sync"
"time"
)

import (
"github.com/baidu/go-lib/log"
"github.com/baidu/go-lib/web-monitor/metrics"
)

import (
bal_backend "github.com/bfenetworks/bfe/bfe_balance/backend"
"github.com/bfenetworks/bfe/bfe_balance/bal_slb"
"github.com/bfenetworks/bfe/bfe_basic"
"github.com/bfenetworks/bfe/bfe_config/bfe_cluster_conf/cluster_conf"
"github.com/bfenetworks/bfe/bfe_config/bfe_cluster_conf/cluster_table_conf"
"github.com/bfenetworks/bfe/bfe_config/bfe_cluster_conf/gslb_conf"
"github.com/bfenetworks/bfe/bfe_util/epp"
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
)

const (
DefaultRetryMax = 3 // default max retries in assigned sub cluster
DefaultCrossRetryMax = 1 // default max retries in other sub cluster, if retries in assigned sub cluster fail
REQ_CTX_EPP = "epp_ctx"
)

type BalanceGslb struct {
Expand All @@ -59,6 +59,13 @@ type BalanceGslb struct {
crossRetry int // max retries in other sub cluster, if all retry within assigned sub cluster fail
hashConf cluster_conf.HashConf // gslb hash conf
BalanceMode string // balanceMode, WRR or WLC, defined in cluster_conf

// EPP related
eppClient epp.EppGrpcClient
eppAddrs []string
eppConnTimeout time.Duration
eppCallRetry uint32
eppConcurrency int
}

func NewBalanceGslb(name string) *BalanceGslb {
Expand Down Expand Up @@ -87,6 +94,20 @@ func (bal *BalanceGslb) SetGslbBasic(gslbBasic cluster_conf.GslbBasicConf) {
bal.BalanceMode = *gslbBasic.BalanceMode

bal.lock.Unlock()
// close EPP client if any
bal.closeEPP()

// init or close EPP client according to balance mode
if gslbBasic.BalanceMode != nil && strings.ToUpper(*gslbBasic.BalanceMode) == cluster_conf.BalanceModeEPP {
if gslbBasic.EPPAddr != nil && len(*gslbBasic.EPPAddr) > 0 {
if err := bal.initEPP(*gslbBasic.EPPAddr); err != nil {
log.Logger.Error("initEPP failed: %v", err)
}
}
} else {
// non-EPP mode, ensure EPP is closed
bal.closeEPP()
}
}

func (bal *BalanceGslb) SetSlowStart(backendConf cluster_conf.BackendBasic) {
Expand All @@ -99,6 +120,163 @@ func (bal *BalanceGslb) SetSlowStart(backendConf cluster_conf.BackendBasic) {
bal.lock.Unlock()
}

// initEPP initializes or refreshes EPP client with given addresses.
func (bal *BalanceGslb) initEPP(addrs []string) error {
if len(addrs) == 0 {
bal.closeEPP()
return nil
}

// if same as existing, do nothing
if len(bal.eppAddrs) == len(addrs) {
same := true
for i := range addrs {
if bal.eppAddrs[i] != addrs[i] {
same = false
break
}
}
if same {
return nil
}
}

// build client via grpc_pool wrapper
// use default small timeouts/concurrency; tune as needed
// client, err := epp.NewClient(addrs, 100*time.Second, 2, 2)
client, err := epp.NewSimpleGrpcClient(addrs[0], 100*time.Second)
if err != nil {
return err
}

// swap in new client
if bal.eppClient != nil {
bal.eppClient.Close()
}
bal.eppClient = client
bal.eppAddrs = append([]string(nil), addrs...)
return nil
}

// closeEPP closes and clears EPP client if exists
func (bal *BalanceGslb) closeEPP() {
if bal.eppClient != nil {
bal.eppClient.Close()
bal.eppClient = nil
}
bal.eppAddrs = nil
}

// chooseBackendFromEPP is a hook to call EPP service to get target backend address.
// Current implementation is a stub that returns empty result (meaning no decision).
// Implement actual gRPC call to EPP service here using bal.eppClient.Conn().
func (bal *BalanceGslb) chooseBackendFromEPP(req *bfe_basic.Request) (string, *epp.EppClient, error) {
if bal.eppClient == nil {
return "", nil, fmt.Errorf("no epp client")
}

conn := bal.eppClient.Conn()
if conn == nil {
return "", nil, fmt.Errorf("epp conn not ready")
}

client, err := epp.NewEppClient(conn)
if err != nil {
return "", nil, err
}

// build subset hint metadata if request provides backend subset (optional)
// For now, we only send minimal metadata and headers: host and path
// Build filter metadata: {"envoy.lb.subset_hint": {"x-gateway-destination-endpoint-subset": [..]}}
//filterMeta := map[string]*structpb.Struct{}
// no subset by default
//metadata := &corev3.Metadata{FilterMetadata: filterMeta}

// construct ProcessingRequest with request_headers
// HttpHeaders requires HeaderMap; construct minimal representation using attributes map
hasBody := true
if req.OutRequest.ContentLength == 0 {
hasBody = false
}

reqMsg := &extprocv3.ProcessingRequest{
Request: &extprocv3.ProcessingRequest_RequestHeaders{
RequestHeaders: epp.BuildEnvoyGRPCHeaders(req.OutRequest.Header, true, !hasBody),
},
//MetadataContext: metadata,
}

if err := client.Send(reqMsg); err != nil {
client.Close()
return "", nil, err
}

if hasBody {
var body []byte
bodyAccessor, _ := req.OutRequest.GetBodyAccessor()
if bodyAccessor != nil {
body, _ = bodyAccessor.GetBytes()
}

reqMsg := &extprocv3.ProcessingRequest{
Request: &extprocv3.ProcessingRequest_RequestBody{
RequestBody: &extprocv3.HttpBody{
Body: body,
//Body: httpReq.Body,
EndOfStream: true,
},
},
//MetadataContext: metadata,
}

if err := client.Send(reqMsg); err != nil {
client.Close()
return "", nil, err
}
}

// receive response
resp, err := client.Recv()
if err != nil {
client.Close()
return "", nil, err
}

var addrinfo string
// Try to inspect dynamic_metadata
if md := resp.GetDynamicMetadata(); md != nil {
if v, ok := md.Fields["envoy.lb"]; ok {
// try to get x-gateway-destination-endpoint
if m := v.GetStructValue(); m != nil {
if val, found := m.Fields["x-gateway-destination-endpoint"]; found {
if s := val.GetStringValue(); s != "" {
// may be comma separated list, pick first
parts := strings.Split(s, ",")
addrinfo = strings.TrimSpace(parts[0])
}
}
}
}
}

if hasBody && resp.Response != nil && resp.GetRequestHeaders() != nil {
// receive response for request body
resp, err = client.Recv()
// the response should be resp.GetRequestBody()
if err != nil {
client.Close()
return "", nil, err
}
}

if addrinfo != "" {
return addrinfo, client, nil
}

client.Close()
return "", nil, fmt.Errorf("no endpoint from epp")
}

// Init initializes gslb cluster with config
func (bal *BalanceGslb) Init(gslbConf gslb_conf.GslbClusterConf) error {
totalWeight := 0
Expand Down Expand Up @@ -312,6 +490,44 @@ func getHashKeyByHeader(req *bfe_basic.Request, header string) []byte {
return nil
}

func (bal *BalanceGslb) BalanceEpp(req *bfe_basic.Request) (*bal_backend.BfeBackend, error) {
var backend *bal_backend.BfeBackend
var err error

bal.lock.Lock()
defer bal.lock.Unlock()

// still in-cluster selection
if req.RetryTime > bal.retryMax {
// for epp only check in-cluster.
state.ErrBkRetryTooMany.Inc(1)
// Note: not modify req.ErrCode to just record last error
return nil, bfe_basic.ErrBkRetryTooMany
}

// If BalanceMode == EPP, try to get backend from EPP service first
addrinfo, eppClient, err := bal.chooseBackendFromEPP(req)
if err == nil && addrinfo != "" {
req.SetContext(REQ_CTX_EPP, eppClient)
// try to find backend in subclusters
for _, sub := range bal.subClusters {
if sub == nil {
continue
}
bk, berr := sub.backends.LookUpBackend(addrinfo)
if berr == nil && bk != nil {
req.Backend.SubclusterName = sub.Name
return bk, nil
}
}
// not found: log and make a temporary backend
log.Logger.Info("EPP returned addr %s not found in local backends", addrinfo)
backend = bal_backend.NewBfeBackendByAddrinfo("EPP_temp", addrinfo, addrinfo)
return backend, nil
}
return nil, fmt.Errorf("EPP no decision")
}

// Balance selects a backend for given request.
func (bal *BalanceGslb) Balance(req *bfe_basic.Request) (*bal_backend.BfeBackend, error) {
var backend *bal_backend.BfeBackend
Expand Down
14 changes: 14 additions & 0 deletions bfe_balance/bal_slb/bal_rr.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,17 @@ func GetHash(value []byte, base uint) int {

return int(hash % uint64(base))
}

// Look up backend with given addrInfo(ip:port)
func (brr *BalanceRR) LookUpBackend(addrInfo string) (*backend.BfeBackend, error) {
brr.Lock()
defer brr.Unlock()

for _, backendRR := range brr.backends {
if backendRR.backend.AddrInfo == addrInfo && backendRR.backend.Avail() && backendRR.weight > 0 {
return backendRR.backend, nil
}
}
/* never come here */
return nil, fmt.Errorf("rr_bal:LookUpBackend %s fail", addrInfo)
}
6 changes: 6 additions & 0 deletions bfe_config/bfe_cluster_conf/cluster_conf/cluster_conf_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
const (
BalanceModeWrr = "WRR" // weighted round robin
BalanceModeWlc = "WLC" // weighted least connection
BalanceModeEPP = "EPP" // balance by epp
)

const (
Expand Down Expand Up @@ -207,6 +208,7 @@ type GslbBasicConf struct {
HashConf *HashConf

BalanceMode *string // balanceMode, default WRR
EPPAddr *[]string // EPP address
}

// ClusterBasicConf is basic conf for cluster.
Expand Down Expand Up @@ -558,6 +560,10 @@ func GslbBasicConfCheck(conf *GslbBasicConf) error {
switch *conf.BalanceMode {
case BalanceModeWrr:
case BalanceModeWlc:
case BalanceModeEPP:
if conf.EPPAddr == nil || len(*conf.EPPAddr) == 0 {
return errors.New("EPPAddr is nil or empty")
}
default:
return fmt.Errorf("unsupported bal mode %s", *conf.BalanceMode)
}
Expand Down
Loading
Loading