Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ import (
"context"
"flag"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/shirou/gopsutil/v4/process"
"infini.sh/framework/core/task"
"infini.sh/framework/core/wrapper/taskset"
"infini.sh/framework/modules/configs/client"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/debug"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/shirou/gopsutil/v4/process"
"infini.sh/framework/core/task"
"infini.sh/framework/core/wrapper/taskset"
"infini.sh/framework/modules/configs/client"

log "github.com/cihub/seelog"
"github.com/kardianos/service"
"infini.sh/framework/core/config"
"infini.sh/framework/core/daemon"
"infini.sh/framework/core/env"
"infini.sh/framework/core/errors"
"infini.sh/framework/core/global"
"infini.sh/framework/core/keystore"
"infini.sh/framework/core/log"
_ "infini.sh/framework/core/logging"
"infini.sh/framework/core/logging/logger"
"infini.sh/framework/core/module"
Expand Down Expand Up @@ -85,6 +85,20 @@ type App struct {
svcFlag string
}

func getServiceWorkingDirectory() string {
executablePath, err := os.Executable()
if err == nil {
// Services are often launched from a manager-controlled cwd. Use the executable directory so
// relative data/log/config paths resolve the same way for service installs and manual runs.
return filepath.Dir(executablePath)
}
workdir, err := os.Getwd()
if err != nil {
panic(err)
}
return workdir
}

const (
env_SILENT_GREETINGS = "SILENT_GREETINGS"
env_SERVICE_NAME = "SERVICE_NAME"
Expand Down Expand Up @@ -575,10 +589,7 @@ func (app *App) Run() {
svcOptions["SuccessExitStatus"] = "1 2 8 SIGKILL"
svcOptions["LimitNOFILE"] = 1024000

workdir, err := os.Getwd()
if err != nil {
panic(err)
}
workdir := getServiceWorkingDirectory()

serviceName := app.environment.GetAppLowercaseName()
if v, ok := os.LookupEnv(env_SERVICE_NAME); ok {
Expand Down
43 changes: 43 additions & 0 deletions app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Framework is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
// - Website: infinilabs.com
// - Email: hello@infini.ltd
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package framework

import (
"os"
"path/filepath"
"testing"
)

func TestGetServiceWorkingDirectoryUsesExecutableDir(t *testing.T) {
executablePath, err := os.Executable()
if err != nil {
t.Fatalf("failed to get executable path: %v", err)
}

got := getServiceWorkingDirectory()
want := filepath.Dir(executablePath)
if got != want {
t.Fatalf("expected service working directory %q, got %q", want, got)
}
}
73 changes: 37 additions & 36 deletions core/config/fs_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ func loadConfigFile(file string) *Config {
return nil
}

func dispatchConfigChangeEvent(ev fsnotify.Event, watcherCallbacks []CallbackFunc) {
for _, v := range watcherCallbacks {
v(ev.Name, ev.Op)
}

cfg := loadConfigFile(ev.Name)
if cfg != nil {
for _, k := range sectionCallbackOrder {
callbacks, ok := sectionCallbacks[k]
if !ok || !cfg.HasField(k) {
continue
}
currentCfg, err := cfg.Child(k, -1)
if err != nil {
log.Error(err)
continue
}
previousCfg, _ := latestConfig[k]
for _, f := range callbacks {
f(previousCfg, currentCfg)
}
latestConfig[k] = currentCfg
}
}

for _, v := range configCallbacks {
v(ev)
}
}

var validExtensions = []string{".yml", ".yaml", ".tpl"}

func SetValidExtension(v []string) {
Expand Down Expand Up @@ -153,40 +183,7 @@ func AddPathToWatch(path string, callback CallbackFunc) {
time.Sleep(2 * time.Second)
log.Trace("2 seconds out, on:", ev.String())

// AddPathToWatch

for _, v := range watcher.callbacks {
v(ev.Name, ev.Op)
}

// NotifyOnConfigChange

for _, v := range configCallbacks {
v(ev)
}

// NotifyOnConfigSectionChange

cfg := loadConfigFile(ev.Name)
if cfg == nil {
continue
}

for k, v := range sectionCallbacks {
if cfg.HasField(k) {
currentCfg, err := cfg.Child(k, -1)
if err != nil {
log.Error(err)
continue
}
// diff config
previousCfg, _ := latestConfig[k]
for _, f := range v {
f(previousCfg, currentCfg)
}
latestConfig[k] = currentCfg
}
}
dispatchConfigChangeEvent(ev, watcher.callbacks)
}
}()
})
Expand Down Expand Up @@ -255,11 +252,13 @@ func StopWatchers() {
}

var sectionCallbacks = map[string][]func(pCfg, cCfg *Config){}
var sectionCallbackOrder = []string{}
var configCallbacks = []func(fsnotify.Event){}
var cfgLocker = sync.RWMutex{}

// NotifyOnConfigSectionChange will trigger callback when any configuration file change detected and
// configKey present in the changed file
// configKey present in the changed file. Section callbacks run before generic NotifyOnConfigChange
// callbacks so section-scoped state can be refreshed before dependent consumers reload.
func NotifyOnConfigSectionChange(configKey string, f func(pCfg, cCfg *Config)) {
cfgLocker.Lock()
defer cfgLocker.Unlock()
Expand All @@ -268,12 +267,14 @@ func NotifyOnConfigSectionChange(configKey string, f func(pCfg, cCfg *Config)) {
if !ok {
v = []func(pCfg, cCfg *Config){}
sectionCallbacks[configKey] = v
sectionCallbackOrder = append(sectionCallbackOrder, configKey)
}
v = append(v, f)
sectionCallbacks[configKey] = v
}

// NotifyOnConfigChange will trigger callback when any configuration file change detected
// NotifyOnConfigChange will trigger callback when any configuration file change detected, after any
// matching NotifyOnConfigSectionChange callbacks for the same event have run.
func NotifyOnConfigChange(f func(fsnotify.Event)) {
cfgLocker.Lock()
defer cfgLocker.Unlock()
Expand Down
120 changes: 120 additions & 0 deletions core/config/fs_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (C) INFINI Labs & INFINI LIMITED.
//
// The INFINI Framework is offered under the GNU Affero General Public License v3.0
// and as commercial software.
//
// For commercial licensing, contact us at:
// - Website: infinilabs.com
// - Email: hello@infini.ltd
//
// Open Source licensed under AGPL V3:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package config

import (
"os"
"path/filepath"
"testing"

"github.com/fsnotify/fsnotify"
)

func TestDispatchConfigChangeEventRunsSectionCallbacksBeforeGenericCallbacks(t *testing.T) {
dir := t.TempDir()
file := filepath.Join(dir, "generated_metrics_tasks.yml")
content := []byte("elasticsearch:\n - id: \"cluster-1\"\n name: \"cluster-1\"\n enabled: true\n endpoint: \"http://127.0.0.1:9200\"\n")
if err := os.WriteFile(file, content, 0o644); err != nil {
t.Fatalf("write config file: %v", err)
}

previousSections := sectionCallbacks
previousOrder := sectionCallbackOrder
previousConfigs := configCallbacks
previousLatest := latestConfig
sectionCallbacks = map[string][]func(pCfg, cCfg *Config){}
sectionCallbackOrder = nil
configCallbacks = nil
latestConfig = map[string]*Config{}
t.Cleanup(func() {
sectionCallbacks = previousSections
sectionCallbackOrder = previousOrder
configCallbacks = previousConfigs
latestConfig = previousLatest
})

var order []string
NotifyOnConfigSectionChange("elasticsearch", func(pCfg, cCfg *Config) {
order = append(order, "section")
})
NotifyOnConfigChange(func(ev fsnotify.Event) {
order = append(order, "generic")
})

dispatchConfigChangeEvent(fsnotify.Event{Name: file, Op: fsnotify.Write}, nil)

if len(order) != 2 {
t.Fatalf("expected 2 callbacks, got %d (%v)", len(order), order)
}
if order[0] != "section" || order[1] != "generic" {
t.Fatalf("expected section callback before generic callback, got %v", order)
}
}

func TestDispatchConfigChangeEventRunsSectionCallbacksInRegistrationOrder(t *testing.T) {
dir := t.TempDir()
file := filepath.Join(dir, "gateway.yml")
content := []byte("flow:\n - name: flow-1\nrouter:\n - name: router-1\nentry:\n - name: entry-1\n")
if err := os.WriteFile(file, content, 0o644); err != nil {
t.Fatalf("write config file: %v", err)
}

previousSections := sectionCallbacks
previousOrder := sectionCallbackOrder
previousConfigs := configCallbacks
previousLatest := latestConfig
sectionCallbacks = map[string][]func(pCfg, cCfg *Config){}
sectionCallbackOrder = nil
configCallbacks = nil
latestConfig = map[string]*Config{}
t.Cleanup(func() {
sectionCallbacks = previousSections
sectionCallbackOrder = previousOrder
configCallbacks = previousConfigs
latestConfig = previousLatest
})

var order []string
NotifyOnConfigSectionChange("flow", func(pCfg, cCfg *Config) {
order = append(order, "flow")
})
NotifyOnConfigSectionChange("router", func(pCfg, cCfg *Config) {
order = append(order, "router")
})
NotifyOnConfigSectionChange("entry", func(pCfg, cCfg *Config) {
order = append(order, "entry")
})

dispatchConfigChangeEvent(fsnotify.Event{Name: file, Op: fsnotify.Write}, nil)

expected := []string{"flow", "router", "entry"}
if len(order) != len(expected) {
t.Fatalf("expected %d callbacks, got %d (%v)", len(expected), len(order), order)
}
for i, want := range expected {
if order[i] != want {
t.Fatalf("expected callback order %v, got %v", expected, order)
}
}
}
Loading
Loading