feat: implemented announce key reconciliation#143
Conversation
f46c40c to
6b83fc5
Compare
fabenan-f
left a comment
There was a problem hiding this comment.
I like the end-to-end-approach, just a few remarks from my side
| func setupOperator(ctx context.Context) (*sql.DB, *rpc.Server, *orbital.Operator) { | ||
| dsn := os.Getenv("AGENT_DATABASE_URL") | ||
| if dsn == "" { | ||
| log.Println("AGENT_DATABASE_URL not set, operator disabled") |
There was a problem hiding this comment.
I know this is only an example agent, but still I'd not allow an agent to run without an operator
| } | ||
| } | ||
|
|
||
| // awaitKeyExists polls the keys table until a key with the given ID and tenant exists. |
There was a problem hiding this comment.
We can make the tests even more integrational if we use the gRPC client for key assertions
| @@ -0,0 +1,42 @@ | |||
| package handler | |||
There was a problem hiding this comment.
Imho having handler.NewAnnounceKey and announcekey.NewHandler is confusing. I think we should consolidate these into a single handler package that contains both the job handler (manager side) and the request handler (operator side). This would be beneficial because:
- Both handlers share common data structures
- Their interconnection would be immediately visible
- It would eliminate the current naming ambiguity
| key.State = model.KeyStatePreActivation | ||
|
|
||
| if _, err := keyStore.CreateKey(ctx, store.CreateKeyQuery{Key: key}); err != nil { | ||
| resp.Fail(fmt.Sprintf("store key: %v", err)) |
There was a problem hiding this comment.
The default case should be to continue, only known terminal errors (like the one in the test case) should fail
| model.KeyStateDestroyed: {}, | ||
| model.KeyStateActive: {}, | ||
| model.KeyStateCompromised: {}, | ||
| model.KeyStateAnnounceFailed: {}, |
There was a problem hiding this comment.
If we introduce a new state here, we'll deviate from the official NIST lifecycle definition. I'm uncertain whether this is advisable from either a compliance or signaling standpoint
There was a problem hiding this comment.
Correct we should have a different processing state
As per some discussion we should have a field a new field called KeyProcessingState and existing KeyState should be renamed to KeyLifeCycleState
type Key struct {
ID string `json:"id"`
Name string `json:"name"`
TenantID string `json:"tenant_id"`
Kind KeyKind `json:"kind"`
ParentID *string `json:"parent_id"`
ManagedBy string `json:"managed_by"`
Labels Labels `json:"labels"`
KeyLifeCycleState KeyLifeCycleState `json:"key_lifecycle_state"`
KeyProcessingState KeyProcessingState `json:"key_processing_state"`
CreatedAt clock.UnixNano `json:"created_at"`
UpdatedAt clock.UnixNano `json:"updated_at"`
}
type KeyProcessingState struct {
Status string `json:"status"`
JobID string `json:"job_id,omitempty"`
}Here jobID shows which JobID is having the lock on , and this will be useful when we do a large key rotation.
but maybe we can start without and extend it later
Credit to @apatsap as well
| ) | ||
|
|
||
| type fakeKeyStore struct { | ||
| keys map[string]*model.Key |
There was a problem hiding this comment.
I'd rather use the real sql store implementation (unhappy paths can still be mocked with wrapper functions)
| ) | ||
| } | ||
|
|
||
| job := orbital.NewJob(announcekey.JobType, data).WithExternalID(key.ID) |
There was a problem hiding this comment.
Since key.ID is generated internally, a lost response could cause the client to retry, resulting in duplicate jobs performing the same action
jithinkunjachan
left a comment
There was a problem hiding this comment.
Nice job 👍🏽 , just few things
| parentID = &data.ParentID | ||
| } | ||
|
|
||
| key := model.NewKey(data.TenantID, data.Name, data.Kind, parentID, data.Target, data.Labels) |
There was a problem hiding this comment.
Not for this PR, but just a question how the tenants info will be propagated to agent DB as we have DB constraints. We might need to think about this topic later.
| model.KeyStateDestroyed: {}, | ||
| model.KeyStateActive: {}, | ||
| model.KeyStateCompromised: {}, | ||
| model.KeyStateAnnounceFailed: {}, |
There was a problem hiding this comment.
Correct we should have a different processing state
As per some discussion we should have a field a new field called KeyProcessingState and existing KeyState should be renamed to KeyLifeCycleState
type Key struct {
ID string `json:"id"`
Name string `json:"name"`
TenantID string `json:"tenant_id"`
Kind KeyKind `json:"kind"`
ParentID *string `json:"parent_id"`
ManagedBy string `json:"managed_by"`
Labels Labels `json:"labels"`
KeyLifeCycleState KeyLifeCycleState `json:"key_lifecycle_state"`
KeyProcessingState KeyProcessingState `json:"key_processing_state"`
CreatedAt clock.UnixNano `json:"created_at"`
UpdatedAt clock.UnixNano `json:"updated_at"`
}
type KeyProcessingState struct {
Status string `json:"status"`
JobID string `json:"job_id,omitempty"`
}Here jobID shows which JobID is having the lock on , and this will be useful when we do a large key rotation.
but maybe we can start without and extend it later
Credit to @apatsap as well
6b83fc5 to
7ebc5cd
Compare
Signed-off-by: Naman Balaji <namanb487@gmail.com>
Signed-off-by: Naman Balaji <namanb487@gmail.com>
7ebc5cd to
89d7a65
Compare
Signed-off-by: Naman Balaji <namanb487@gmail.com>
| defer agentDB.Close() | ||
|
|
||
| go func() { | ||
| if err := operator.ListenAndRespond(ctx); err != nil { |
There was a problem hiding this comment.
can we add a log.Info before ListenAndRespond
| return orbital.CancelJobConfirmer(fmt.Sprintf("invalid job data: %v", err)), nil | ||
| } | ||
|
|
||
| _, err := h.keyStore.GetKeyByID(ctx, data.KeyID, data.TenantID) |
There was a problem hiding this comment.
we need to check that the key state allows the target state using keylifecycle.ValidateTransition
There was a problem hiding this comment.
imo keylifecycle.ValidateTransition is more for checking state transitions, but in this case we just create a key and for the whole announce operation it will stay in a preactive state so there's effectively no transition taking place here.
| // TaskData is the payload exchanged between the root job handler and the | ||
| // agent task handler. It is JSON-encoded into the orbital Job/Task data | ||
| // field. | ||
| type TaskData struct { |
There was a problem hiding this comment.
For now its alright. But lets add a comment here that when we add the next handler that certain parts of the data (e.g. Target, Labels, TenantID) might need to be refactored into something like common.TaskInfo
| key.ID = data.KeyID | ||
| key.LifeCycleState = model.KeyLifeCyclePreActivation | ||
|
|
||
| err := keyStore.CreateKey(ctx, key) |
There was a problem hiding this comment.
we need to make sure parentID key exists and can be used to from a lifecycle perspective to announce this key
There was a problem hiding this comment.
Will be done as part of a subsequent PR
| if data.ParentID != "" { | ||
| parentID = &data.ParentID | ||
| } | ||
|
|
There was a problem hiding this comment.
we need to check that data.TenantID exists
There was a problem hiding this comment.
Will be done as part of a subsequent PR
| @@ -37,16 +61,97 @@ func (s *KeyService) AnnounceKey(ctx context.Context, req *AnnounceKeyRequest) ( | |||
| req.GetLabels(), | |||
| ) | |||
|
|
|||
There was a problem hiding this comment.
We need to check
- tenantID exists
- Parent exists within the same tenant.
- If Parent doesn't exist we need to make sure key is root jkey
- If Parent exists we need to check that parent:
- is in an allowed key lifecycle state
- that the keyKind of the new key can be attached to the parent key by the hierarchy definition
There was a problem hiding this comment.
For now just allowing active state and new key being generated from it's direct parent.
| ) | ||
|
|
||
| if err := s.keyStore.CreateKey(ctx, key); err != nil { | ||
| key, err := s.upsertKey(ctx, newKey) |
There was a problem hiding this comment.
for the double commit to work we should create the job first and then check in the jobConfirm func whether the key exists and can be activated
| } | ||
|
|
||
| // If a job is already linked, the caller is retrying — return as-is. | ||
| if key.KeyProcessingState.JobID != "" { |
There was a problem hiding this comment.
What if the previous job failed (i.e. it has a job id) and this call's intention is to retry the job
There was a problem hiding this comment.
We look up the key by name and if we find a key in failed state we create a new job, we use oldJobID concatenated with the keyID as an externalID. This is effectively our retry job and by using oldJobId we kind of dedup it.
Signed-off-by: Naman Balaji <namanb487@gmail.com>
0917ea9 to
9d68fe0
Compare
fabenan-f
left a comment
There was a problem hiding this comment.
The uniqueness constraint on key.Name can detect concurrent jobs when used as an external ID. This might let us remove some of the conditional logic we currently have in the double commit. But the current approach should work as well
| if err != nil { | ||
| if errors.Is(err, store.ErrKeyNotFound) { | ||
| // ConfirmJob is idempotent and orbital will eventually | ||
| // time out the job if the key never lands. |
There was a problem hiding this comment.
Jobs do not time out in the confirming phase but we can think of something in the future
| return nil, vErr | ||
| } | ||
|
|
||
| existing, lookupErr := s.keyStore.GetKeyByName(ctx, store.GetKeyByNameQuery{ |
There was a problem hiding this comment.
We could be more optimistic and try to create a key first and handle a potential existing key error later, but nothing that needs to be done now
No description provided.