Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions pkg/common/utils/resource/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,27 @@ const (
BROKER_PRESTOP = "/opt/apache-doris/broker_prestop.sh"

//keys for pod env variables
POD_NAME = "POD_NAME"
POD_IP = "POD_IP"
HOST_IP = "HOST_IP"
POD_NAMESPACE = "POD_NAMESPACE"
ADMIN_USER = "USER"
ADMIN_PASSWD = "PASSWD"
DORIS_ROOT_KEY = "DORIS_ROOT"
POD_NAME = "POD_NAME"
POD_IP = "POD_IP"
HOST_IP = "HOST_IP"
POD_NAMESPACE = "POD_NAMESPACE"
ADMIN_USER = "USER"
ADMIN_PASSWD = "PASSWD"
DORIS_ROOT_KEY = "DORIS_ROOT"
DNS_READY_TIMEOUT = "DNS_READY_TIMEOUT"
DNS_READY_INTERVAL = "DNS_READY_INTERVAL"

KRB5_MOUNT_PATH = "KRB5_MOUNT_PATH"
KRB5_CONFIG = "KRB5_CONFIG"
KEYTAB_MOUNT_PATH = "KEYTAB_MOUNT_PATH"
KEYTAB_FINAL_USED_PATH = "KEYTAB_FINAL_USED_PATH"

DEFAULT_ADMIN_USER = "root"
DEFAULT_ROOT_PATH = "/opt/apache-doris"
POD_INFO_PATH = "/etc/podinfo"
POD_INFO_VOLUME_NAME = "podinfo"
DEFAULT_ADMIN_USER = "root"
DEFAULT_ROOT_PATH = "/opt/apache-doris"
DEFAULT_DNS_READY_TIMEOUT = "120"
DEFAULT_DNS_READY_INTERVAL = "2"
POD_INFO_PATH = "/etc/podinfo"
POD_INFO_VOLUME_NAME = "podinfo"

NODE_TOPOLOGYKEY = "kubernetes.io/hostname"

Expand All @@ -90,6 +94,9 @@ const (
DISAGGREGATED_FE_MAIN_CONTAINER_NAME = "fe"
DISAGGREGATED_BE_MAIN_CONTAINER_NAME = "compute"
DISAGGREGATED_MS_MAIN_CONTAINER_NAME = "metaservice"

DEFAULT_FE_TERMINATION_GRACE_PERIOD_SECONDS int64 = 330
DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS int64 = 200
)

type ProbeType string
Expand All @@ -100,6 +107,39 @@ var (
Exec ProbeType = "exec"
)

func buildFQDNReadinessExecProbe(enableTLS string, config map[string]interface{}, port int32, path string, policy *v1.ReadinessProbePolicy) *corev1.Probe {
host := "$(hostname -f)"
var curlCmd string
if enableTLS == "true" {
caCert := GetString(config, TLS_CA_CERTIFICATE_PATH_KEY)
clientCert := GetString(config, TLS_CERTIFICATE_PATH_KEY)
clientKey := GetString(config, TLS_PRIVATE_KEY_PATH_KEY)
curlCmd = fmt.Sprintf(
"host=%s; (getent hosts \"$host\" >/dev/null 2>&1 || nslookup \"$host\" >/dev/null 2>&1) && curl --fail --silent --output /dev/null --cacert %s --cert %s --key %s https://$host:%d%s",
host, caCert, clientCert, clientKey, port, path,
)
} else {
curlCmd = fmt.Sprintf(
"host=%s; (getent hosts \"$host\" >/dev/null 2>&1 || nslookup \"$host\" >/dev/null 2>&1) && curl --fail --silent --output /dev/null http://$host:%d%s",
host, port, path,
)
}

probe := &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"bash", "-c", curlCmd},
},
},
}
if policy != nil {
applyReadinessProbePolicy(probe, policy.TimeoutSeconds, policy.FailureThreshold, policy.PeriodSeconds)
}
return probe
}

func NewPodTemplateSpec(dcr *v1.DorisCluster, config map[string]interface{}, componentType v1.ComponentType) corev1.PodTemplateSpec {
spec := getBaseSpecFromCluster(dcr, componentType)
var volumes []corev1.Volume
Expand Down Expand Up @@ -439,7 +479,7 @@ func NewBaseMainContainer(dcr *v1.DorisCluster, config map[string]interface{}, c
skipInit = dcr.Spec.BeSpec.SkipDefaultSystemInit
case v1.Component_CN:
spec = dcr.Spec.CnSpec.BaseSpec
skipInit = dcr.Spec.BeSpec.SkipDefaultSystemInit
skipInit = dcr.Spec.CnSpec.SkipDefaultSystemInit
case v1.Component_Broker:
spec = dcr.Spec.BrokerSpec.BaseSpec
default:
Expand Down Expand Up @@ -550,27 +590,31 @@ func NewBaseMainContainer(dcr *v1.DorisCluster, config map[string]interface{}, c
// client certificates. Kubernetes HTTPGet probes cannot provide client certs, so we use
// an Exec probe with curl instead. This is compatible with both TLS and mTLS modes.
enableTLS := GetString(config, ENABLE_TLS_KEY)
if enableTLS == "true" && c.ReadinessProbe != nil && c.ReadinessProbe.HTTPGet != nil {
caCert := GetString(config, TLS_CA_CERTIFICATE_PATH_KEY)
clientCert := GetString(config, TLS_CERTIFICATE_PATH_KEY)
clientKey := GetString(config, TLS_PRIVATE_KEY_PATH_KEY)
curlCmd := fmt.Sprintf(
"curl --fail --silent --output /dev/null --cacert %s --cert %s --key %s https://localhost:%d%s",
caCert, clientCert, clientKey, readnessPort, health_api_path,
)
tlsProbe := &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"bash", "-c", curlCmd},
if c.ReadinessProbe != nil && c.ReadinessProbe.HTTPGet != nil {
if GetStartMode(config) == START_MODEL_FQDN && (componentType == v1.Component_FE || componentType == v1.Component_BE || componentType == v1.Component_CN) {
c.ReadinessProbe = buildFQDNReadinessExecProbe(enableTLS, config, readnessPort, health_api_path, spec.ReadinessProbePolicy)
} else if enableTLS == "true" {
caCert := GetString(config, TLS_CA_CERTIFICATE_PATH_KEY)
clientCert := GetString(config, TLS_CERTIFICATE_PATH_KEY)
clientKey := GetString(config, TLS_PRIVATE_KEY_PATH_KEY)
curlCmd := fmt.Sprintf(
"curl --fail --silent --output /dev/null --cacert %s --cert %s --key %s https://localhost:%d%s",
caCert, clientCert, clientKey, readnessPort, health_api_path,
)
tlsProbe := &corev1.Probe{
PeriodSeconds: 5,
FailureThreshold: 3,
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"bash", "-c", curlCmd},
},
},
},
}
if spec.ReadinessProbePolicy != nil {
applyReadinessProbePolicy(tlsProbe, spec.ReadinessProbePolicy.TimeoutSeconds, spec.ReadinessProbePolicy.FailureThreshold, spec.ReadinessProbePolicy.PeriodSeconds)
}
if spec.ReadinessProbePolicy != nil {
applyReadinessProbePolicy(tlsProbe, spec.ReadinessProbePolicy.TimeoutSeconds, spec.ReadinessProbePolicy.FailureThreshold, spec.ReadinessProbePolicy.PeriodSeconds)
}
c.ReadinessProbe = tlsProbe
}
c.ReadinessProbe = tlsProbe
}

c.Lifecycle = lifeCycle(prestopScript)
Expand Down Expand Up @@ -694,6 +738,14 @@ func buildEnvFromPod() []corev1.EnvVar {
Name: config_env_name,
Value: config_env_path,
},
{
Name: DNS_READY_TIMEOUT,
Value: DEFAULT_DNS_READY_TIMEOUT,
},
{
Name: DNS_READY_INTERVAL,
Value: DEFAULT_DNS_READY_INTERVAL,
},
}
}

Expand Down Expand Up @@ -1033,8 +1085,8 @@ func ReadinessProbe(port int32, path string, commands []string, pt ProbeType, po
// StartupProbe returns a startup probe.
func startupProbe(port, timeout int32, path string, commands []string, pt ProbeType) *corev1.Probe {
var failurethreshold int32
if timeout < 300 {
timeout = 300
if timeout < 360 {
timeout = 360
}

failurethreshold = timeout / 5
Expand Down Expand Up @@ -1119,6 +1171,16 @@ func LifeCycleWithPreStopScript(lc *corev1.Lifecycle, preStopScript string) *cor
return lc
}

func AddTerminationGracePeriodSeconds(tplSpec *corev1.PodTemplateSpec, config map[string]interface{}, defaultSeconds int64) {
seconds := GetTerminationGracePeriodSeconds(config)
if seconds <= 0 {
seconds = defaultSeconds
}
if seconds > 0 {
tplSpec.Spec.TerminationGracePeriodSeconds = &seconds
}
}

// getProbe describe a health check.
func getProbe(port int32, path string, commands []string, pt ProbeType) corev1.ProbeHandler {
switch pt {
Expand Down
22 changes: 22 additions & 0 deletions pkg/common/utils/resource/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ func Test_LifeCycleWithPreStopScript(t *testing.T) {
}
}

func Test_AddTerminationGracePeriodSeconds_Default(t *testing.T) {
pts := &corev1.PodTemplateSpec{}
AddTerminationGracePeriodSeconds(pts, map[string]interface{}{}, DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS)
if pts.Spec.TerminationGracePeriodSeconds == nil {
t.Fatalf("expected terminationGracePeriodSeconds")
}
if *pts.Spec.TerminationGracePeriodSeconds != DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS {
t.Errorf("expected default terminationGracePeriodSeconds=%d, got %d", DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS, *pts.Spec.TerminationGracePeriodSeconds)
}
}

func Test_AddTerminationGracePeriodSeconds_ConfigOverride(t *testing.T) {
pts := &corev1.PodTemplateSpec{}
AddTerminationGracePeriodSeconds(pts, map[string]interface{}{GRACE_SHUTDOWN_WAIT_SECONDS: "60"}, DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS)
if pts.Spec.TerminationGracePeriodSeconds == nil {
t.Fatalf("expected terminationGracePeriodSeconds")
}
if *pts.Spec.TerminationGracePeriodSeconds != 60 {
t.Errorf("expected configured terminationGracePeriodSeconds=60, got %d", *pts.Spec.TerminationGracePeriodSeconds)
}
}

func Test_BuildDisaggregatedProbe(t *testing.T) {
c := &corev1.Container{}
cs := &dv1.CommonSpec{
Expand Down
8 changes: 1 addition & 7 deletions pkg/controller/sub_controller/be/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,7 @@ func (be *Controller) beContainer(dcr *v1.DorisCluster) corev1.Container {
return c
}

// Only configure the TerminationGracePeriodSeconds when grace_shutdown_wait_seconds configured in be.conf
func (be *Controller) addTerminationGracePeriodSeconds(dcr *v1.DorisCluster, tplSpec *corev1.PodTemplateSpec) {
config, _ := be.GetConfig(context.Background(), &dcr.Spec.BeSpec.ConfigMapInfo, dcr.Namespace, v1.Component_BE)
seconds := resource.GetTerminationGracePeriodSeconds(config)
if seconds > 0 {
tplSpec.Spec.TerminationGracePeriodSeconds = &seconds
return
}
return
resource.AddTerminationGracePeriodSeconds(tplSpec, config, resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS)
}
9 changes: 8 additions & 1 deletion pkg/controller/sub_controller/be/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package be
import (
"encoding/json"
v1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/apache/doris-operator/pkg/common/utils/resource"
"testing"
)

Expand Down Expand Up @@ -57,7 +58,13 @@ func Test_buildBEPodTemplateSpec(t *testing.T) {
}

be := &Controller{}
be.buildBEPodTemplateSpec(dcr, map[string]interface{}{})
pts := be.buildBEPodTemplateSpec(dcr, map[string]interface{}{})
if pts.Spec.TerminationGracePeriodSeconds == nil {
t.Fatalf("expected BE terminationGracePeriodSeconds")
}
if *pts.Spec.TerminationGracePeriodSeconds != resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS {
t.Errorf("expected BE terminationGracePeriodSeconds=%d, got %d", resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS, *pts.Spec.TerminationGracePeriodSeconds)
}
}

func Test_buildBEPodTemplateSpecWithFEAffinity(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/sub_controller/cn/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

func (cn *Controller) buildCnPodTemplateSpec(dcr *v1.DorisCluster, config map[string]interface{}) corev1.PodTemplateSpec {
podTemplateSpec := resource.NewPodTemplateSpec(dcr, config, v1.Component_CN)
resource.AddTerminationGracePeriodSeconds(&podTemplateSpec, config, resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS)
var containers []corev1.Container
containers = append(containers, podTemplateSpec.Spec.Containers...)
cnContainer := cn.cnContainer(dcr)
Expand Down
60 changes: 60 additions & 0 deletions pkg/controller/sub_controller/cn/pod_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package cn

import (
"encoding/json"
"testing"

v1 "github.com/apache/doris-operator/api/doris/v1"
"github.com/apache/doris-operator/pkg/common/utils/resource"
)

func Test_buildCnPodTemplateSpec(t *testing.T) {
dcrJsonStr := `{
"apiVersion": "doris.selectdb.com/v1",
"kind": "DorisCluster",
"metadata": {
"name": "doriscluster-sample",
"namespace": "default"
},
"spec": {
"cnSpec": {
"image": "selectdb/doris.be-ubuntu:2.1.6",
"replicas": 3
},
"feSpec": {
"image": "selectdb/doris.fe-ubuntu:2.1.6",
"replicas": 3
}
}
}`

dcr := &v1.DorisCluster{}
if err := json.Unmarshal([]byte(dcrJsonStr), dcr); err != nil {
t.Errorf("the buildCnPodTemplateSpec unmarshal failed, err=%s", err.Error())
}

cn := &Controller{}
pts := cn.buildCnPodTemplateSpec(dcr, map[string]interface{}{})
if pts.Spec.TerminationGracePeriodSeconds == nil {
t.Fatalf("expected CN terminationGracePeriodSeconds")
}
if *pts.Spec.TerminationGracePeriodSeconds != resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS {
t.Errorf("expected CN terminationGracePeriodSeconds=%d, got %d", resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS, *pts.Spec.TerminationGracePeriodSeconds)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (dcgs *DisaggregatedComputeGroupsController) NewStatefulset(ddc *dv1.DorisD

func (dcgs *DisaggregatedComputeGroupsController) NewPodTemplateSpec(ddc *dv1.DorisDisaggregatedCluster, selector map[string]string, cvs map[string]interface{}, cg *dv1.ComputeGroup) corev1.PodTemplateSpec {
pts := resource.NewPodTemplateSpecWithCommonSpec(cg.SkipDefaultSystemInit, &cg.CommonSpec, dv1.DisaggregatedBE)
resource.AddTerminationGracePeriodSeconds(&pts, cvs, resource.DEFAULT_BE_TERMINATION_GRACE_PERIOD_SECONDS)
//pod template metadata.
func() {
l := (resource.Labels)(selector)
Expand Down Expand Up @@ -143,7 +144,7 @@ func (dcgs *DisaggregatedComputeGroupsController) NewCGContainer(ddc *dv1.DorisD
c.Ports = resource.GetDisaggregatedContainerPorts(cvs, dv1.DisaggregatedBE)
c.Env = cg.CommonSpec.EnvVars
c.Env = append(c.Env, resource.GetPodDefaultEnv()...)
c.Env = append(c.Env, dcgs.newSpecificEnvs(ddc, cg)...)
c.Env = append(c.Env, dcgs.newSpecificEnvs(ddc, cg, cvs)...)

if cg.SkipDefaultSystemInit {
// Only works when the doris version is higher than 2.1.8 or 3.0.4
Expand Down Expand Up @@ -182,7 +183,7 @@ func (dcgs *DisaggregatedComputeGroupsController) NewCGContainer(ddc *dv1.DorisD
}

// add specific envs for be, the env will used by be_disaggregated_entrypoint script.
func (dcgs *DisaggregatedComputeGroupsController) newSpecificEnvs(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) []corev1.EnvVar {
func (dcgs *DisaggregatedComputeGroupsController) newSpecificEnvs(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup, cvs map[string]interface{}) []corev1.EnvVar {
var cgEnvs []corev1.EnvVar
stsName := ddc.GetCGStatefulsetName(cg)

Expand All @@ -195,6 +196,7 @@ func (dcgs *DisaggregatedComputeGroupsController) newSpecificEnvs(ddc *dv1.Doris
cgEnvs = append(cgEnvs,
corev1.EnvVar{Name: resource.STATEFULSET_NAME, Value: stsName},
corev1.EnvVar{Name: resource.COMPUTE_GROUP_NAME, Value: ddc.GetCGName(cg)},
corev1.EnvVar{Name: "HOST_TYPE", Value: resource.GetStartMode(cvs)},
corev1.EnvVar{Name: resource.ENV_FE_ADDR, Value: feAddr},
corev1.EnvVar{Name: resource.ENV_FE_PORT, Value: fqpStr})

Expand Down
Loading
Loading