agentic triggers - Gmail provider, OAuth, Pub/Sub#290
Conversation
External events can now fire v3 inference. First provider: Gmail.
- New triggers_module with provider abstraction (GmailProvider impl)
- DB: agentic_triggers, agentic_trigger_credentials, agentic_trigger_events
with credential dedup per (provider, external_account_id)
- Two-step OAuth (POST /v1/triggers → consent URL → callback)
- Pub/Sub topic + push subscription created per trigger with OIDC token
signing; webhook verifies JWT against trigger.provider_config.oidc_audience
- Webhook /v1/triggers/{trigger_id}/{agentic_id}/invoke decodes the push,
short-circuits stale historyIds, enqueues Celery task
- Worker (process_trigger_event_task) fetches Gmail messages via history
list, applies subject regex + mime allowlist, dispatches via existing
AsyncAgenticExecutionService — audit rows in agentic_trigger_events with
per-message idempotency (UNIQUE trigger_id, provider_event_id)
- APScheduler renewer keeps Gmail watch alive (renews <24h before expiry)
- Retry endpoint reuses stored credential to re-run start_subscription
after transient failures
- KMS-encrypted tokens at rest; FloKmsService gains a separate symmetric
enc_key path (GCP requires CryptoKey resource, not CryptoKeyVersion)
- Subscription tuned: ack_deadline=60s, 1-day retention, exponential
retry backoff (10s–600s)
- Auth middleware skips OAuth callback + invoke routes (dynamic-pattern
matching for {trigger_id}/{agentic_id})
- Dockerfile updated for floware; new celery_worker Dockerfile
New env vars (floware + celery_worker):
- GOOGLE_OAUTH_CLIENT_ID, GOOGLE_OAUTH_CLIENT_SECRET — OAuth client for
Gmail consent flow
- GOOGLE_OAUTH_REDIRECT_URI — must end in
/floware/v1/triggers/oauth/google/callback and match the Google Console
authorized redirect URI exactly
- GMAIL_PUSH_ENDPOINT_TEMPLATE — must contain literal {trigger_id} and
{agentic_id} placeholders; rendered per-trigger when the Pub/Sub
subscription is created
- GMAIL_PUBSUB_TOPIC_PREFIX (optional, default 'agentic-trigger')
- GMAIL_PUBSUB_OIDC_SA_EMAIL — dedicated service account Pub/Sub mints
OIDC tokens as; needs roles/iam.serviceAccountTokenCreator from the
Pub/Sub service agent, and roles/iam.serviceAccountUser from the
floware SA
- GCP_KMS_ENC_CRYPTO_KEY — symmetric key for OAuth-token encryption
(separate from the asymmetric GCP_KMS_CRYPTO_KEY used for sign/verify)
- Reuses GCP_PROJECT_ID for the Pub/Sub project (no separate var)
📝 WalkthroughWalkthroughAdds a complete triggers subsystem for Gmail: DB migrations and ORM models, provider abstractions and a Gmail provider (OAuth + Pub/Sub), token encryption, services for CRUD/event processing/renewal, FastAPI endpoints, Celery task/worker wiring, and Floware startup and DI integration. ChangesAgentic Trigger System with Gmail Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
|
|
||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = '74c837a023f3' |
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision: str = '74c837a023f3' | ||
| down_revision: Union[str, None] = '3b5b1bf90e6c' |
| # revision identifiers, used by Alembic. | ||
| revision: str = '74c837a023f3' | ||
| down_revision: Union[str, None] = '3b5b1bf90e6c' | ||
| branch_labels: Union[str, Sequence[str], None] = None |
| revision: str = '74c837a023f3' | ||
| down_revision: Union[str, None] = '3b5b1bf90e6c' | ||
| branch_labels: Union[str, Sequence[str], None] = None | ||
| depends_on: Union[str, Sequence[str], None] = None |
| # state, and avoids gRPC retries on NotFound. | ||
| try: | ||
| publisher.create_topic(request={'name': topic_path}, timeout=30) | ||
| except google_exceptions.AlreadyExists: |
There was a problem hiding this comment.
Actionable comments posted: 11
🧹 Nitpick comments (6)
wavefront/server/modules/triggers_module/triggers_module/providers/registry.py (1)
19-25: ⚡ Quick winUse
raise ... from excto preserve exception chain.The static analysis correctly identifies that the exception chain should be preserved for better debugging.
♻️ Suggested fix
def get(self, provider_type: str) -> TriggerProvider: try: return self._providers[provider_type] - except KeyError: + except KeyError as exc: raise UnsupportedTriggerProvider( f'No trigger provider registered for type: {provider_type}' - ) + ) from exc🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/providers/registry.py` around lines 19 - 25, The get method in the registry swallows the original KeyError, losing the exception chain; update the except block in TriggerRegistry.get to capture the KeyError (e.g., except KeyError as exc:) and re-raise UnsupportedTriggerProvider using "raise UnsupportedTriggerProvider(f'No trigger provider registered for type: {provider_type}') from exc" so the original exception context from self._providers is preserved for debugging.wavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.py (1)
287-304: 💤 Low valueConsider adding a buffer before token expiry.
The access token cache check at line 290 uses
token_expires_at > nowwith no buffer. A token that expires in 1 second will be considered valid but may expire mid-request. Consider adding a small buffer (e.g., 60 seconds) to avoid edge cases.♻️ Add expiry buffer
+from datetime import timedelta + +EXPIRY_BUFFER = timedelta(seconds=60) + async def _fresh_access_token( self, credential_id: UUID, provider: TriggerProvider ) -> tuple[str, str]: credential = await self._credentials.find_one(id=credential_id) if not credential: raise InvalidTriggerState(f'Credential {credential_id} not found') now = datetime.now(timezone.utc) if ( credential.encrypted_access_token and credential.token_expires_at - and credential.token_expires_at > now + and credential.token_expires_at > now + EXPIRY_BUFFER ):🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.py` around lines 287 - 304, The token validity check in trigger_crud_service (the block using credential.token_expires_at, now, and returning self._crypto.decrypt(credential.encrypted_access_token)) lacks a safety buffer and may treat a nearly-expired token as valid; update the condition to require token_expires_at > now + buffer_seconds (e.g., 60) before returning the cached decrypted access token, so that the refresh path (provider.refresh_access_token and the subsequent _credentials.find_one_and_update) runs when remaining lifetime is below the buffer.wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py (1)
349-376: ⚡ Quick winUse a set for deduplication and consider pagination limits.
The
msg_id not in message_idscheck on line 369 is O(n) per message. Using a set for deduplication would improve performance. Additionally, the loop has no limit on pages fetched, which could be problematic for accounts with large history deltas.♻️ Suggested improvement
def _list_new_message_ids( self, service, email_address: str, start_history_id: int ) -> List[str]: message_ids: List[str] = [] + seen_ids: set[str] = set() page_token: Optional[str] = None + max_pages = 50 # Safety limit + pages_fetched = 0 while True: + if pages_fetched >= max_pages: + logger.warning( + f'Reached max pagination limit ({max_pages}) for history list' + ) + break request_kwargs: Dict[str, Any] = { 'userId': email_address, 'startHistoryId': str(start_history_id), 'historyTypes': ['messageAdded'], 'labelId': 'INBOX', } if page_token: request_kwargs['pageToken'] = page_token response = service.users().history().list(**request_kwargs).execute() + pages_fetched += 1 for history_entry in response.get('history', []): for added in history_entry.get('messagesAdded', []): msg = added.get('message') or {} msg_id = msg.get('id') - if msg_id and msg_id not in message_ids: + if msg_id and msg_id not in seen_ids: + seen_ids.add(msg_id) message_ids.append(msg_id) page_token = response.get('nextPageToken') if not page_token: break return message_ids🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py` around lines 349 - 376, The _list_new_message_ids function currently appends message IDs into a list and checks membership with "msg_id not in message_ids" which is O(n); change it to maintain a set (e.g., seen_ids) for O(1) deduplication and only append to the message_ids list when the id is newly added to the set; additionally, add a configurable page fetch limit / max_pages loop guard in the pagination logic (or bail if a high threshold is reached) to prevent unbounded history fetching when iterating service.users().history().list in _list_new_message_ids.wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.py (1)
97-99: 💤 Low valueAdd error logging for refresh failures to match
exchange_codepattern.
exchange_codelogs detailed error context before re-raising on failure (lines 53-57), butrefresh_access_tokenraises immediately without logging. Additionally,payload['access_token']will raiseKeyErrorif the response is malformed, which may be confusing without context.🔧 Suggested improvement
def refresh_access_token(self, refresh_token: str) -> TokenBundle: response = requests.post( GOOGLE_TOKEN_URL, data={ 'refresh_token': refresh_token, 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'refresh_token', }, timeout=20, ) + if not response.ok: + logger.error( + f'Google token refresh failed: status={response.status_code} ' + f'body={response.text}' + ) response.raise_for_status() payload = response.json() - access_token = payload['access_token'] + access_token = payload.get('access_token') + if not access_token: + raise RuntimeError('Google did not return an access_token on refresh.')🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.py` around lines 97 - 99, The refresh_access_token function should mirror exchange_code's error-logging pattern: wrap the HTTP call/response parsing in a try/except block, catch HTTPError and general exceptions around response.raise_for_status() and response.json(), and log detailed context (e.g., response.status_code, response.text, and the parsed payload) via the same logger before re-raising; also handle a missing 'access_token' (KeyError) by logging the payload and response context and raising a clear exception. Ensure you reference the response, payload, and access_token variables in the log and re-raise the original exception after logging.wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py (1)
20-20: ⚡ Quick winRemove debug/development commented-out code before merging.
Lines 20 and 33 contain commented-out bypass code (
# return plaintextand# return stored). These should be removed to avoid confusion and accidental uncommenting in production.♻️ Remove commented-out code
def encrypt(self, plaintext: Optional[str]) -> Optional[str]: if plaintext is None: return None - # return plaintext `#disabling` use of kms for now ciphertext_bytes = self._kms.encrypt(plaintext)def decrypt(self, stored: Optional[str]) -> Optional[str]: if stored is None: return None - # return stored `#disabling` use of kms for now ciphertext_bytes = base64.b64decode(stored.encode('utf-8'))Also applies to: 33-33
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py` at line 20, Remove the leftover debug bypass comments in the token_crypto module: delete the commented-out lines "# return plaintext" and "# return stored" so there is no hint of a disabled KMS/secure path; search inside token_crypto.py for references to plaintext and stored (the debug returns) and remove those commented return statements, leaving only the real encrypt/decrypt logic and returning the proper encrypted/stored values from the existing functions.wavefront/server/modules/triggers_module/pyproject.toml (1)
20-20: ⚡ Quick winRevisit the
beautifulsoup4version cap inwavefront/server/modules/triggers_module/pyproject.toml(line 20)The constraint
beautifulsoup4>=4.8.0,<4.9.dev0effectively pins you to 4.8.x, while the latest stable release is4.14.3. Public sources checked didn’t turn up any Beautiful Soup 4.8.x–specific security advisory evidence, but this still creates a large maintenance gap; widen the upper bound (e.g.,<5) unless you have a hard compatibility reason to stay on 4.8.x.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/pyproject.toml` at line 20, The dependency pin "beautifulsoup4>=4.8.0,<4.9.dev0" in pyproject.toml unnecessarily restricts Beautiful Soup to 4.8.x; update that constraint to allow newer stable releases (for example change the upper bound to "<5" or a more permissive range like ">=4.8.0,<5") so the project can receive bug fixes and security updates; locate the line containing the exact string "beautifulsoup4>=4.8.0,<4.9.dev0" in the triggers_module pyproject.toml and replace it with the new version range, run your dependency install/test flow to verify compatibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@wavefront/server/background_jobs/celery_worker/celery_worker/env.py`:
- Around line 42-50: The current env var declarations (GOOGLE_OAUTH_CLIENT_ID,
GOOGLE_OAUTH_CLIENT_SECRET, GOOGLE_OAUTH_REDIRECT_URI, GCP_PROJECT_ID,
GMAIL_PUBSUB_TOPIC_PREFIX, GMAIL_PUSH_ENDPOINT_TEMPLATE,
GMAIL_PUBSUB_OIDC_SA_EMAIL) default to empty strings which allows the worker to
start in an invalid state; update env loading to fail fast by either using
os.environ[...] for required vars or by validating after loading and raising a
clear RuntimeError if any required variable is missing/empty, and include the
var name in the error message so the startup fails immediately with actionable
feedback.
In
`@wavefront/server/background_jobs/celery_worker/celery_worker/tasks/trigger_event_task.py`:
- Around line 27-37: The code currently wraps UUID(trigger_id) inside the main
try/except which treats malformed UUID ValueError as transient and triggers
retries; change this by validating/parsing the trigger_id before the retryable
block: attempt UUID(trigger_id) in its own try/except ValueError, log a clear
error via logger.error/exception and return or raise a non-retryable exception
so self.request.retries/self.max_retries logic in the outer except is not
invoked; keep the existing processor.process(trigger_id=UUID(trigger_id),
raw_payload=raw_payload) call and outer retry behavior for genuine runtime
errors only.
In `@wavefront/server/docker/celery_worker.Dockerfile`:
- Around line 7-12: The apt-get install invocation in the RUN line should
include --no-install-recommends to avoid installing unnecessary packages; update
the RUN line in celery_worker.Dockerfile (and the sibling Dockerfiles
workflow.Dockerfile, inference_app.Dockerfile, floware.Dockerfile,
floconsole.Dockerfile, call_processing.Dockerfile) so it reads apt-get install
-y --no-install-recommends \ followed by the same packages and keep the existing
rm -rf /var/lib/apt/lists/*; ensure you only add the flag and do not remove the
existing cleanup step.
In
`@wavefront/server/modules/triggers_module/triggers_module/controllers/trigger_controller.py`:
- Line 270: The invoke endpoint currently calls raw_payload = await
request.json() (and similar parsing around the block at lines 283-288) without
handling decoding errors, causing a 500 on invalid JSON; wrap the await
request.json() call(s) in a try/except that catches JSON decoding exceptions
(e.g., json.JSONDecodeError or ValueError depending on the framework) and return
a 400 Bad Request with a clear error message instead of allowing the generic
handler to run; update the invoke handler (the function that reads raw_payload)
to validate/parse within that try/except so both the initial raw_payload and the
subsequent parsing steps return 400 for invalid JSON.
- Around line 68-69: Query parameters success_redirect_url and
failure_redirect_url are accepted and used for redirects without validation,
allowing open redirects; add a central helper (e.g., is_valid_redirect_target or
validate_redirect_target) that checks the URL is internal or its host is in an
allowlist and use it to validate success_redirect_url and failure_redirect_url
before performing any redirect (reject/ignore the param or raise HTTPException
if invalid). Update all handlers that read these params (the same places where
success_redirect_url/failure_redirect_url are defined) to call the helper and
only redirect to validated targets; ensure the helper is reused across the
module so the same check is applied consistently to every occurrence.
In
`@wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py`:
- Around line 226-244: The try/except around publisher.get_iam_policy and
publisher.set_iam_policy currently swallows errors with logger.warning, which
hides failures to grant Pub/Sub publish rights for Gmail; change the except
block to log at error level with full context (include topic_path, wants_role,
wants_member and the exception) and then re-raise the exception (or raise a
descriptive custom exception) so callers can detect and handle the failure
instead of continuing silently.
In
`@wavefront/server/modules/triggers_module/triggers_module/services/trigger_event_processor.py`:
- Around line 231-237: The code currently falls back to empty strings when
self._crypto.decrypt(...) returns falsy for credential.encrypted_access_token
and encrypted_refresh_token, which masks KMS/credential issues; update the logic
in TriggerEventProcessor (where self._crypto.decrypt is called) to validate
decrypted_access_token and decrypted_refresh_token immediately after decrypting
(no empty-string fallback) and raise a clear exception (including credential.id
or credential.external_account_id and which token is missing) if decryption
returns None/empty, and pass the validated decrypted_refresh_token to
provider.refresh_access_token; apply the same validation/raise behavior at the
other occurrence of self._crypto.decrypt for the refresh token.
- Around line 76-79: The code currently always calls await
self._advance_cursor(...) after dispatching, but exceptions from dispatch are
caught and converted into a 'failed' history row (see the dispatch/exception
handling block that writes a 'failed' row around lines 165-179), which means the
cursor advances past transient failures; change the control flow so
_advance_cursor is only called when dispatch completed successfully (or when the
created history row status is not 'failed'). Concretely, modify the method that
performs dispatch (the block that creates the 'failed' row) to return a success
flag or the created history row, and call self._advance_cursor(trigger,
provider, raw_payload) only when that flag/row indicates success (i.e., no
swallowed exception), keeping the existing exception-to-row handling intact but
preventing cursor advancement on swallowed errors.
In
`@wavefront/server/modules/triggers_module/triggers_module/services/trigger_push_receiver.py`:
- Around line 67-79: When handling Gmail triggers in trigger_push_receiver.py,
the current logic skips verification if (trigger.provider_config or
{}).get('oidc_audience') is missing, allowing unauthenticated requests; change
this to fail closed by returning an ignored/unauthorized response when
oidc_audience is falsy. Specifically, inside the branch guarded by
trigger.provider == 'gmail' (referencing oidc_audience, self._verifier.verify,
PubSubSignatureError, authorization_header, and trigger_id), add a check that if
oidc_audience is None/empty you log a warning and return
{'status':'ignored','reason':'missing_oidc_audience'} (or similar) instead of
proceeding to queue work. Ensure you keep the existing exception handling for
PubSubSignatureError unchanged.
In
`@wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py`:
- Around line 41-42: The code compares watch_expiration (parsed via
datetime.fromisoformat(raw)) to an aware UTC cutoff, but fromisoformat can yield
a naive datetime; update the parsing path that produces watch_expiration (the
variable derived from raw) to ensure it's converted to an aware UTC datetime
(e.g., after datetime.fromisoformat(raw) check if .tzinfo is None and if so set
tzinfo=datetime.timezone.utc or use astimezone(datetime.timezone.utc)), then use
that normalized value in the existing comparison (the if expiration is None or
expiration > cutoff: continue) and apply the same normalization for the other
comparison block around lines 80-83 so all watch_expiration comparisons use
timezone-aware UTC datetimes.
In `@wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py`:
- Around line 69-71: GcpKMS.decrypt currently accepts bytes or str and UTF-8
encodes strings, which can corrupt base64 ciphertext; change the contract so
GcpKMS.decrypt(ciphertext: bytes) only accepts bytes (update the signature and
remove the str->encode branch in GcpKMS.decrypt), and update any callers (e.g.,
TokenCrypto.decrypt in token_crypto.py) to pass bytes; alternatively if you must
accept str, implement explicit base64 decoding+validation inside GcpKMS.decrypt
(decode with base64.b64decode and raise on invalid input) and update
FloKMS.decrypt / FloKmsService.decrypt types to reflect the chosen contract to
keep behavior consistent.
---
Nitpick comments:
In `@wavefront/server/modules/triggers_module/pyproject.toml`:
- Line 20: The dependency pin "beautifulsoup4>=4.8.0,<4.9.dev0" in
pyproject.toml unnecessarily restricts Beautiful Soup to 4.8.x; update that
constraint to allow newer stable releases (for example change the upper bound to
"<5" or a more permissive range like ">=4.8.0,<5") so the project can receive
bug fixes and security updates; locate the line containing the exact string
"beautifulsoup4>=4.8.0,<4.9.dev0" in the triggers_module pyproject.toml and
replace it with the new version range, run your dependency install/test flow to
verify compatibility.
In
`@wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.py`:
- Around line 97-99: The refresh_access_token function should mirror
exchange_code's error-logging pattern: wrap the HTTP call/response parsing in a
try/except block, catch HTTPError and general exceptions around
response.raise_for_status() and response.json(), and log detailed context (e.g.,
response.status_code, response.text, and the parsed payload) via the same logger
before re-raising; also handle a missing 'access_token' (KeyError) by logging
the payload and response context and raising a clear exception. Ensure you
reference the response, payload, and access_token variables in the log and
re-raise the original exception after logging.
In
`@wavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.py`:
- Around line 349-376: The _list_new_message_ids function currently appends
message IDs into a list and checks membership with "msg_id not in message_ids"
which is O(n); change it to maintain a set (e.g., seen_ids) for O(1)
deduplication and only append to the message_ids list when the id is newly added
to the set; additionally, add a configurable page fetch limit / max_pages loop
guard in the pagination logic (or bail if a high threshold is reached) to
prevent unbounded history fetching when iterating service.users().history().list
in _list_new_message_ids.
In
`@wavefront/server/modules/triggers_module/triggers_module/providers/registry.py`:
- Around line 19-25: The get method in the registry swallows the original
KeyError, losing the exception chain; update the except block in
TriggerRegistry.get to capture the KeyError (e.g., except KeyError as exc:) and
re-raise UnsupportedTriggerProvider using "raise UnsupportedTriggerProvider(f'No
trigger provider registered for type: {provider_type}') from exc" so the
original exception context from self._providers is preserved for debugging.
In
`@wavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.py`:
- Around line 287-304: The token validity check in trigger_crud_service (the
block using credential.token_expires_at, now, and returning
self._crypto.decrypt(credential.encrypted_access_token)) lacks a safety buffer
and may treat a nearly-expired token as valid; update the condition to require
token_expires_at > now + buffer_seconds (e.g., 60) before returning the cached
decrypted access token, so that the refresh path (provider.refresh_access_token
and the subsequent _credentials.find_one_and_update) runs when remaining
lifetime is below the buffer.
In
`@wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py`:
- Line 20: Remove the leftover debug bypass comments in the token_crypto module:
delete the commented-out lines "# return plaintext" and "# return stored" so
there is no hint of a disabled KMS/secure path; search inside token_crypto.py
for references to plaintext and stored (the debug returns) and remove those
commented return statements, leaving only the real encrypt/decrypt logic and
returning the proper encrypted/stored values from the existing functions.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 802d856d-543b-474d-86ea-74c5e8c978f4
⛔ Files ignored due to path filters (1)
wavefront/server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (35)
wavefront/server/apps/floware/floware/config.iniwavefront/server/apps/floware/floware/server.pywavefront/server/apps/floware/floware/services/scheduler_manager.pywavefront/server/apps/floware/pyproject.tomlwavefront/server/background_jobs/celery_worker/celery_worker/celery_app.pywavefront/server/background_jobs/celery_worker/celery_worker/env.pywavefront/server/background_jobs/celery_worker/celery_worker/tasks/trigger_event_task.pywavefront/server/background_jobs/celery_worker/celery_worker/worker_setup.pywavefront/server/background_jobs/celery_worker/pyproject.tomlwavefront/server/docker/celery_worker.Dockerfilewavefront/server/docker/floware.Dockerfilewavefront/server/modules/db_repo_module/db_repo_module/alembic/env.pywavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_05_16_1401-74c837a023f3_add_agentic_triggers.pywavefront/server/modules/db_repo_module/db_repo_module/db_repo_container.pywavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger.pywavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_credential.pywavefront/server/modules/db_repo_module/db_repo_module/models/agentic_trigger_event.pywavefront/server/modules/triggers_module/pyproject.tomlwavefront/server/modules/triggers_module/triggers_module/controllers/trigger_controller.pywavefront/server/modules/triggers_module/triggers_module/models/trigger_schemas.pywavefront/server/modules/triggers_module/triggers_module/providers/base.pywavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_oauth.pywavefront/server/modules/triggers_module/triggers_module/providers/gmail/gmail_provider.pywavefront/server/modules/triggers_module/triggers_module/providers/gmail/pubsub_signature.pywavefront/server/modules/triggers_module/triggers_module/providers/registry.pywavefront/server/modules/triggers_module/triggers_module/services/trigger_crud_service.pywavefront/server/modules/triggers_module/triggers_module/services/trigger_event_processor.pywavefront/server/modules/triggers_module/triggers_module/services/trigger_push_receiver.pywavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.pywavefront/server/modules/triggers_module/triggers_module/triggers_container.pywavefront/server/modules/triggers_module/triggers_module/utils/input_builder.pywavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.pywavefront/server/modules/user_management_module/user_management_module/authorization/require_auth.pywavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.pywavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py
💤 Files with no reviewable changes (1)
- wavefront/server/packages/flo_cloud/flo_cloud/gcp/gcs.py
| RUN apt-get update && apt-get install -y \ | ||
| libpq-dev \ | ||
| gcc \ | ||
| libgl1 \ | ||
| libglib2.0-0 \ | ||
| && rm -rf /var/lib/apt/lists/* |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -nP --iglob '*Dockerfile*' 'apt-get\s+install\s+-y(?!\s+--no-install-recommends)'Repository: rootflo/wavefront
Length of output: 613
Add --no-install-recommends to apt-get install to reduce image size/attack surface.
wavefront/server/docker/celery_worker.Dockerfile (and other Dockerfiles in the same folder: workflow.Dockerfile, inference_app.Dockerfile, floware.Dockerfile, floconsole.Dockerfile, call_processing.Dockerfile) use apt-get install -y without --no-install-recommends.
💡 Proposed fix
-RUN apt-get update && apt-get install -y \
+RUN apt-get update && apt-get install -y --no-install-recommends \
libpq-dev \
gcc \
libgl1 \
libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| RUN apt-get update && apt-get install -y \ | |
| libpq-dev \ | |
| gcc \ | |
| libgl1 \ | |
| libglib2.0-0 \ | |
| && rm -rf /var/lib/apt/lists/* | |
| RUN apt-get update && apt-get install -y --no-install-recommends \ | |
| libpq-dev \ | |
| gcc \ | |
| libgl1 \ | |
| libglib2.0-0 \ | |
| && rm -rf /var/lib/apt/lists/* |
🧰 Tools
🪛 Trivy (0.69.3)
[error] 7-12: 'apt-get' missing '--no-install-recommends'
'--no-install-recommends' flag is missed: 'apt-get update && apt-get install -y libpq-dev gcc libgl1 libglib2.0-0 && rm -rf /var/lib/apt/lists/*'
Rule: DS-0029
(IaC/Dockerfile)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@wavefront/server/docker/celery_worker.Dockerfile` around lines 7 - 12, The
apt-get install invocation in the RUN line should include
--no-install-recommends to avoid installing unnecessary packages; update the RUN
line in celery_worker.Dockerfile (and the sibling Dockerfiles
workflow.Dockerfile, inference_app.Dockerfile, floware.Dockerfile,
floconsole.Dockerfile, call_processing.Dockerfile) so it reads apt-get install
-y --no-install-recommends \ followed by the same packages and keep the existing
rm -rf /var/lib/apt/lists/*; ensure you only add the flag and do not remove the
existing cleanup step.
| # Layer-3: advance the stored cursor so future pushes only fetch the | ||
| # range past this one. Only move forward, never backward. | ||
| await self._advance_cursor(trigger, provider, raw_payload) | ||
|
|
There was a problem hiding this comment.
Do not advance provider cursor after swallowed dispatch failures.
Line 165 swallows all dispatch exceptions into a 'failed' row, and Line 78 still advances history_id. That combination can permanently skip transiently failed messages.
Suggested direction
- for event in events:
- results.append(await self._handle_single_event(trigger, event))
+ had_failures = False
+ for event in events:
+ result = await self._handle_single_event(trigger, event)
+ results.append(result)
+ if result.get('status') == 'failed':
+ had_failures = True
- await self._advance_cursor(trigger, provider, raw_payload)
+ if not had_failures:
+ await self._advance_cursor(trigger, provider, raw_payload)- except Exception as exc:
+ except Exception as exc:
logger.exception(
f'Failed to dispatch trigger event {event.provider_event_id}'
)
await self._events.find_one_and_update(
{'id': row.id},
status='failed',
error=str(exc)[:2000],
processed_at=datetime.now(timezone.utc),
)
- return {
- 'provider_event_id': event.provider_event_id,
- 'status': 'failed',
- 'error': str(exc),
- }
+ raiseAlso applies to: 165-179
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@wavefront/server/modules/triggers_module/triggers_module/services/trigger_event_processor.py`
around lines 76 - 79, The code currently always calls await
self._advance_cursor(...) after dispatching, but exceptions from dispatch are
caught and converted into a 'failed' history row (see the dispatch/exception
handling block that writes a 'failed' row around lines 165-179), which means the
cursor advances past transient failures; change the control flow so
_advance_cursor is only called when dispatch completed successfully (or when the
created history row status is not 'failed'). Concretely, modify the method that
performs dispatch (the block that creates the 'failed' row) to return a success
flag or the created history row, and call self._advance_cursor(trigger,
provider, raw_payload) only when that flag/row indicates success (i.e., no
swallowed exception), keeping the existing exception-to-row handling intact but
preventing cursor advancement on swallowed errors.
| def decrypt(self, ciphertext: bytes | str) -> bytes: | ||
| if isinstance(ciphertext, str): | ||
| ciphertext = ciphertext.encode('utf-8') |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Find decrypt/encrypt callsites for GcpKMS/FloKMS =="
rg -n -C2 '\b(encrypt|decrypt)\s*\(' --type=py
echo
echo "== Locate token crypto utilities and inspect ciphertext encoding/decoding behavior =="
fd -i 'token_crypto.py' --exec sed -n '1,220p' {}
echo
echo "== Inspect trigger credential/event models for encrypted field types (Text vs LargeBinary) =="
fd -i 'agentic_trigger_credential.py' --exec sed -n '1,260p' {}
fd -i 'agentic_trigger_event.py' --exec sed -n '1,260p' {}
echo
echo "== Check for base64 usage around encrypted values =="
rg -n -C2 'base64|b64encode|b64decode|ciphertext|encrypted' --type=pyRepository: rootflo/wavefront
Length of output: 50374
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Inspect gcp kms decrypt implementation =="
sed -n '1,140p' wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py
echo
echo "== Inspect TokenCrypto decrypt pipeline (base64 storage -> kms decrypt bytes) =="
sed -n '1,120p' wavefront/server/modules/triggers_module/triggers_module/utils/token_crypto.py
echo
echo "== Find direct calls into FloKmsService/KMS decrypt (beyond TokenCrypto) =="
rg -n --hidden --no-ignore -S '\.decrypt\(' wavefront/server/modules wavefront/server/packages/flo_cloud | rg -n 'kms|FloKmsService|_kms|_crypto\.decrypt|decrypt\(ciphertext|ciphertext_bytes|encrypted_.*token' || true
echo
echo "== Specifically find callsites passing strings into KMS decrypt =="
rg -n --hidden --no-ignore -S '\.decrypt\(\s*[^,)]*stored[^,)]*\)' wavefront/server/modules || true
rg -n --hidden --no-ignore -S '\.decrypt\(\s*[^,)]*encrypted[^,)]*\)' wavefront/server/modules || true
rg -n --hidden --no-ignore -S 'kms.*decrypt\(' wavefront/server | head -n 50 || trueRepository: rootflo/wavefront
Length of output: 9240
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Inspect FloKmsService wrapper (flo_cloud/kms.py) =="
sed -n '1,140p' wavefront/server/packages/flo_cloud/flo_cloud/kms.py
echo
echo "== Inspect KMS interface types (_types/kms.py) =="
sed -n '1,120p' wavefront/server/packages/flo_cloud/flo_cloud/_types/kms.py
echo
echo "== Inspect AWS KMS encrypt/decrypt string/bytes handling =="
sed -n '1,120p' wavefront/server/packages/flo_cloud/flo_cloud/aws/kms.py
echo
echo "== Inspect Azure Key Vault encrypt/decrypt string/bytes handling =="
sed -n '1,140p' wavefront/server/packages/flo_cloud/flo_cloud/azure/key_vault.pyRepository: rootflo/wavefront
Length of output: 9228
Tighten ciphertext type contract for GcpKMS.decrypt (flo_cloud/gcp/kms.py:69-71).
In the token-at-rest path, stored ciphertext is base64 text in TokenCrypto.decrypt, which base64-decodes into ciphertext_bytes and then calls self._kms.decrypt(ciphertext_bytes), so the str -> utf-8 encode branch in GcpKMS.decrypt is not exercised for stored tokens. However, FloKMS.decrypt/FloKmsService.decrypt are typed as taking ciphertext: str, while callers pass bytes (see token_crypto.py), and if any caller ever passes the base64-encoded string directly into GcpKMS.decrypt, ciphertext.encode('utf-8') would produce invalid ciphertext. Align the interface to decrypt(ciphertext: bytes) (and/or accept str only as base64 with explicit decoding + validation).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@wavefront/server/packages/flo_cloud/flo_cloud/gcp/kms.py` around lines 69 -
71, GcpKMS.decrypt currently accepts bytes or str and UTF-8 encodes strings,
which can corrupt base64 ciphertext; change the contract so
GcpKMS.decrypt(ciphertext: bytes) only accepts bytes (update the signature and
remove the str->encode branch in GcpKMS.decrypt), and update any callers (e.g.,
TokenCrypto.decrypt in token_crypto.py) to pass bytes; alternatively if you must
accept str, implement explicit base64 decoding+validation inside GcpKMS.decrypt
(decode with base64.b64decode and raise on invalid input) and update
FloKMS.decrypt / FloKmsService.decrypt types to reflect the chosen contract to
keep behavior consistent.
| request['push_config'] = pubsub_v1.types.PushConfig(**push_config_kwargs) | ||
| try: | ||
| subscriber.create_subscription(request=request, timeout=30) | ||
| except google_exceptions.AlreadyExists: |
| ): | ||
| try: | ||
| await trigger_crud_service.delete_trigger(trigger_id) | ||
| except TriggerNotFound: |
| task.cancel() | ||
| try: | ||
| loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) | ||
| except Exception: |
| except Exception as exc: | ||
| logger.warning(f'Trigger subscription renewer failed: {exc}') | ||
|
|
||
| scheduler_manager.register_trigger_subscription_renewer( |
There was a problem hiding this comment.
This will renew the subscription multiple times parallely if there are multiple pods running.
|
|
||
|
|
||
| # Triggers — Gmail OAuth + Pub/Sub | ||
| GOOGLE_OAUTH_CLIENT_ID: str = _required_env('GOOGLE_OAUTH_CLIENT_ID') |
There was a problem hiding this comment.
I had assumed this was part of the db schema for creating triggers ? Are we using environment variables ?
There was a problem hiding this comment.
This should go in db, and should be part of the configuration
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py (2)
56-63:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftApply the 1000-row cap after selecting expiring triggers.
Line 56 caps the query to the first 1000 active rows and only then checks
watch_expiration. If the table grows past 1000 active triggers, an expiring watch outside that slice is never considered and can lapse indefinitely. Page through the active set, or order/filter on expiration before enforcing the limit.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py` around lines 56 - 63, The current logic in TriggerSubscriptionRenewer (the method that calls self._triggers.find and then iterates `for trigger in active`) applies a hard limit of 1000 before checking watch expiration, which can miss expiring triggers; change the query to either page through all active triggers (iterate with offset/continuation until no more results) or add a filter/order on the expiration field in the datastore query so the 1000 cap only applies after selecting triggers whose provider_config watch_expiration is within the renew window; update the code around self._triggers.find(...) and the loop that calls _extract_expiration to implement pagination or an expiration-based filter so no expiring trigger is skipped.
66-73:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon’t let
last_errorpersistence abort the whole renewal pass.If the update on Line 70 fails, that exception escapes this
exceptblock and the remaining triggers are skipped. Wrap the bookkeeping write in its owntry/exceptso one bad DB write does not stop the batch.Suggested fix
except Exception as exc: logger.warning( f'Failed to renew subscription for trigger {trigger.id}: {exc}' ) - await self._triggers.find_one_and_update( - {'id': trigger.id}, - last_error=f'renew failed: {exc}', - ) + try: + await self._triggers.find_one_and_update( + {'id': trigger.id}, + last_error=f'renew failed: {exc}', + ) + except Exception as update_exc: + logger.warning( + 'Failed to persist renewal error for trigger ' + f'{trigger.id}: {update_exc}' + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py` around lines 66 - 73, The except block that logs renewal failures currently also performs the bookkeeping DB write via await self._triggers.find_one_and_update(...), and if that update raises an exception it escapes and aborts the whole renewal pass; wrap the await self._triggers.find_one_and_update call in its own try/except so any DB write error is caught and logged (e.g., logger.warning with the update error and trigger id) without re-raising, ensuring the loop continues; locate this code in trigger_subscription_renewer.py around the renewal/error handling (the block that references trigger.id and self._triggers.find_one_and_update) and apply the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In
`@wavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.py`:
- Around line 56-63: The current logic in TriggerSubscriptionRenewer (the method
that calls self._triggers.find and then iterates `for trigger in active`)
applies a hard limit of 1000 before checking watch expiration, which can miss
expiring triggers; change the query to either page through all active triggers
(iterate with offset/continuation until no more results) or add a filter/order
on the expiration field in the datastore query so the 1000 cap only applies
after selecting triggers whose provider_config watch_expiration is within the
renew window; update the code around self._triggers.find(...) and the loop that
calls _extract_expiration to implement pagination or an expiration-based filter
so no expiring trigger is skipped.
- Around line 66-73: The except block that logs renewal failures currently also
performs the bookkeeping DB write via await
self._triggers.find_one_and_update(...), and if that update raises an exception
it escapes and aborts the whole renewal pass; wrap the await
self._triggers.find_one_and_update call in its own try/except so any DB write
error is caught and logged (e.g., logger.warning with the update error and
trigger id) without re-raising, ensuring the loop continues; locate this code in
trigger_subscription_renewer.py around the renewal/error handling (the block
that references trigger.id and self._triggers.find_one_and_update) and apply the
change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c23beaea-75bc-4fcd-bdfb-5b0d9f17eea0
📒 Files selected for processing (3)
wavefront/server/apps/floware/floware/server.pywavefront/server/modules/triggers_module/triggers_module/services/trigger_subscription_renewer.pywavefront/server/modules/triggers_module/triggers_module/triggers_container.py
🚧 Files skipped from review as they are similar to previous changes (2)
- wavefront/server/apps/floware/floware/server.py
- wavefront/server/modules/triggers_module/triggers_module/triggers_container.py
External events can now fire v3 inference. First provider: Gmail.
New env vars (floware + celery_worker):
Summary by CodeRabbit
New Features
Chores