Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -108,6 +109,17 @@ type ProcessManager struct {
ptpEventHandler *event.EventHandler
}

// findProcessesByName returns a list of processes with the given name
func (p *ProcessManager) findProcessesByName(name string) []*ptpProcess {
var procs []*ptpProcess
for _, proc := range p.process {
if proc != nil && proc.name == name {
procs = append(procs, proc)
}
}
return procs
}

// NewProcessManager is used by unit tests
func NewProcessManager() *ProcessManager {
processPTP := &ptpProcess{}
Expand Down Expand Up @@ -251,6 +263,7 @@ type ptpProcess struct {
dn *Daemon
cmdSetEnabledMutex sync.Mutex
offset float64
skipInitialStartup string
}

func (p *ptpProcess) Stopped() bool {
Expand Down Expand Up @@ -303,6 +316,9 @@ type Daemon struct {

// Allow vendors to include plugins
pluginManager plugin.PluginManager

delayedPhc2sys atomic.Bool
delayedPhc2sysMu sync.Mutex // protects skipInitialStartup on phc2sys processes
}

// New LinuxPTP is called by daemon to generate new linuxptp instance
Expand Down Expand Up @@ -467,9 +483,7 @@ func (dn *Daemon) applyNodePTPProfiles() error {
if p != nil {
p.eventCh = dn.processManager.eventChannel
// start ptp4l process early , it doesn't have
if p.depProcess == nil {
go p.cmdRun(dn.stdoutToSocket, &dn.pluginManager)
} else {
if p.depProcess != nil {
for _, d := range p.depProcess {
if d != nil {
time.Sleep(3 * time.Second)
Expand All @@ -491,11 +505,24 @@ func (dn *Daemon) applyNodePTPProfiles() error {
glog.Infof("enabling dep process %s with Max %d Min %d Holdover %d", d.Name(), p.ptpClockThreshold.MaxOffsetThreshold, p.ptpClockThreshold.MinOffsetThreshold, p.ptpClockThreshold.HoldOverTimeout)
}
}
go p.cmdRun(dn.stdoutToSocket, &dn.pluginManager)
}
if p.skipInitialStartup != "" {
glog.Infof("Delaying %s startup: %s", p.name, p.skipInitialStartup)
continue
}
go p.cmdRun(dn.stdoutToSocket, &dn.pluginManager)
dn.pluginManager.AfterRunPTPCommand(&p.nodeProfile, p.name)
}
}
// Arm the delayed-phc2sys flag now that the startup loop is complete.
// Keeping it false during the loop ensures HandleDelayedPhc2sysStartup
// cannot clear skipInitialStartup and race with the loop's skip check.
for _, p := range dn.processManager.process {
if p != nil && p.skipInitialStartup != "" {
dn.delayedPhc2sys.Store(true)
break
}
}
dn.pluginManager.PopulateHwConfig(dn.hwconfigs)
*dn.refreshNodePtpDevice = true
dn.readyTracker.setConfig(true)
Expand Down Expand Up @@ -792,6 +819,13 @@ func (dn *Daemon) applyNodePtpProfile(runID int, nodeProfile *ptpv1.PtpProfile)
// TODO addScheduling
dprocess.depProcess = append(dprocess.depProcess, pmcProcess)
}
} else if pProcess == phc2sysProcessName {
glog.Infof("Setting up phc2sys (%s)", clockType)
// Delay phc2sys startup until the clock source has synchronized.
dn.delayedPhc2sysMu.Lock()
dprocess.skipInitialStartup = "waiting for PHC synchronization before adjusting system time"
dn.delayedPhc2sysMu.Unlock()
glog.Infof("Delaying phc2sys startup: %s", dprocess.skipInitialStartup)
} else if pProcess == ts2phcProcessName { //& if the x plugin is enabled
if clockType == event.GM {
if output.gnss_serial_port == "" {
Expand Down Expand Up @@ -1302,6 +1336,46 @@ func (p *ptpProcess) MonitorEvent(offset float64, clockState string) {
// not implemented
}

// HandleDelayedPhc2sysStartup checks if phc2sys was delayed and if the current offset is within the 1s threshold, starts it.
func (dn *Daemon) HandleDelayedPhc2sysStartup(source string, offset float64, profileName *string) {
if profileName == nil {
return
}
if !dn.delayedPhc2sys.Load() {
return
}
if math.Abs(offset) < 1000000000 {
dn.delayedPhc2sysMu.Lock()
defer dn.delayedPhc2sysMu.Unlock()
if !dn.delayedPhc2sys.Load() { // re-check under lock
return
}
for _, proc := range dn.processManager.findProcessesByName(phc2sysProcessName) {
if proc.skipInitialStartup == "" || proc.nodeProfile.Name == nil {
continue
}
// Match if the reporting process is in the same profile as phc2sys,
// or in one of phc2sys's HA-linked profiles. The latter handles the
// case where phc2sys is in a dedicated profile with no ptp4l of its
// own (e.g. test-dual-nic-bc-ha with haProfiles=master1,master2).
_, linkedByHA := proc.haProfile[*profileName]
if *proc.nodeProfile.Name == *profileName || linkedByHA {
glog.Infof("%s offset is %f (sub-second); enabling %s", source, offset, proc.name)
proc.skipInitialStartup = ""
proc.cmdSetEnabled(true)
dn.pluginManager.AfterRunPTPCommand(&proc.nodeProfile, proc.name)
}
}
// Only clear the daemon-wide flag once no phc2sys processes remain delayed.
for _, proc := range dn.processManager.findProcessesByName(phc2sysProcessName) {
if proc.skipInitialStartup != "" {
return
}
}
dn.delayedPhc2sys.Store(false)
}
}

func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface string, state event.PTPState, extraValue map[event.ValueType]interface{}) {
var ptpState event.PTPState
ptpState = state
Expand All @@ -1313,6 +1387,9 @@ func (p *ptpProcess) ProcessTs2PhcEvents(ptpOffset float64, source string, iface
}

if source == ts2phcProcessName { // for ts2phc send it to event to create metrics and events
if p.dn != nil {
p.dn.HandleDelayedPhc2sysStartup(source, ptpOffset, p.nodeProfile.Name)
}
var values = make(map[event.ValueType]interface{})

values[event.OFFSET] = ptpOffsetInt64
Expand Down
Loading