From b28cbb1aabda32725c81d36d8f32f915f9e88ca9 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Tue, 26 May 2026 21:32:37 +0530 Subject: [PATCH 1/4] feat(floware): change to OAuth flow from DWD for cron job --- .../server/apps/floware/floware/config.ini | 5 +- .../services/email_service.py | 61 ++++++++----------- .../user_management_module/user_container.py | 5 +- 3 files changed, 33 insertions(+), 38 deletions(-) diff --git a/wavefront/server/apps/floware/floware/config.ini b/wavefront/server/apps/floware/floware/config.ini index 82e35721..10f3d32d 100644 --- a/wavefront/server/apps/floware/floware/config.ini +++ b/wavefront/server/apps/floware/floware/config.ini @@ -127,9 +127,10 @@ authority=${AUTHORITY} webhook_url=${WEBHOOK_URL} [gmail] -service_account_file=${GMAIL_SERVICE_ACCOUNT_FILE} +client_id=${GMAIL_CLIENT_ID} +client_secret=${GMAIL_CLIENT_SECRET} +refresh_token=${GMAIL_REFRESH_TOKEN} email_sender=${GMAIL_SENDER_EMAILID} -delegate_user=${GMAIL_DELEGATE_USER} [scheduler] daily_alert_cron=${DAILY_ALERT_CRON:0 5 * * *} diff --git a/wavefront/server/modules/user_management_module/user_management_module/services/email_service.py b/wavefront/server/modules/user_management_module/user_management_module/services/email_service.py index 4430a2f6..d8555753 100644 --- a/wavefront/server/modules/user_management_module/user_management_module/services/email_service.py +++ b/wavefront/server/modules/user_management_module/user_management_module/services/email_service.py @@ -6,12 +6,12 @@ import msal import requests import base64 -import json from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders -from google.oauth2 import service_account +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials from googleapiclient.discovery import build @@ -133,39 +133,39 @@ def send_email( class GmailEmailService(EmailService): - def __init__(self, service_account_b64, email_sender, delegate_user): - self.email_sender = email_sender or delegate_user - self.delegate_user = delegate_user + def __init__(self, client_id, client_secret, refresh_token, email_sender): + self.client_id = client_id + self.client_secret = client_secret + self.refresh_token = refresh_token + self.email_sender = email_sender self.scopes = ['https://www.googleapis.com/auth/gmail.send'] - if not delegate_user: - raise Exception('Delegate user required for gmail') - - try: - decoded_json = base64.b64decode(service_account_b64).decode('utf-8') - self.service_account_info = json.loads(decoded_json) - except Exception as e: - raise Exception(f'Invalid Gmail service account configuration: {str(e)}') + if not all([client_id, client_secret, refresh_token, email_sender]): + raise Exception( + 'Gmail OAuth requires client_id, client_secret, refresh_token, and email_sender' + ) - def get_access_token(self): - credentials = service_account.Credentials.from_service_account_info( - self.service_account_info, scopes=self.scopes + def get_credentials(self) -> Credentials: + creds = Credentials( + token=None, + refresh_token=self.refresh_token, + token_uri='https://oauth2.googleapis.com/token', + client_id=self.client_id, + client_secret=self.client_secret, + scopes=self.scopes, ) + creds.refresh(Request()) + return creds - credentials = credentials.with_subject(self.delegate_user) + def get_access_token(self): + return self.get_credentials() - return credentials + def _get_gmail_service(self): + return build('gmail', 'v1', credentials=self.get_credentials()) def send_forget_password_email(self, forget_url_link: str, email: str) -> bool: try: - credentials = self.get_access_token() - if not credentials: - logger.error('failed to obtain gmail access token') - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail='Failed to authenticate while sending the email.', - ) - service = build('gmail', 'v1', credentials=credentials) + service = self._get_gmail_service() message = MIMEMultipart() message['to'] = email @@ -206,14 +206,7 @@ def send_email( attachments: list[dict[str, Any]] | None = None, ) -> bool: try: - credentials = self.get_access_token() - if not credentials: - logger.error('failed to obtain gmail access token') - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail='Failed to authenticate while sending the email.', - ) - service = build('gmail', 'v1', credentials=credentials) + service = self._get_gmail_service() message = MIMEMultipart() message['to'] = email_id message['from'] = f'Rootflo Notifications <{self.email_sender}>' diff --git a/wavefront/server/modules/user_management_module/user_management_module/user_container.py b/wavefront/server/modules/user_management_module/user_management_module/user_container.py index 72496d54..2303f8b3 100644 --- a/wavefront/server/modules/user_management_module/user_management_module/user_container.py +++ b/wavefront/server/modules/user_management_module/user_management_module/user_container.py @@ -69,9 +69,10 @@ class UserContainer(containers.DeclarativeContainer): ), gmail=providers.Singleton( GmailEmailService, - service_account_b64=config.gmail.service_account_file, + client_id=config.gmail.client_id, + client_secret=config.gmail.client_secret, + refresh_token=config.gmail.refresh_token, email_sender=config.gmail.email_sender, - delegate_user=config.gmail.delegate_user, ), ) From 967da80f9afb867ef45589aaf158f349d46802a9 Mon Sep 17 00:00:00 2001 From: vishnu r kumar Date: Fri, 29 May 2026 21:12:12 +0530 Subject: [PATCH 2/4] feat: make scheduled queries respect rls policy --- .../datasources/ScheduleEmailAlertDialog.tsx | 280 ++++++++++++---- wavefront/client/src/types/scheduled-job.ts | 2 +- .../floware/di/application_container.py | 7 + .../server/apps/floware/floware/server.py | 3 + .../floware/services/scheduled_job_service.py | 302 ++++++++++++------ 5 files changed, 442 insertions(+), 152 deletions(-) diff --git a/wavefront/client/src/pages/apps/[appId]/datasources/ScheduleEmailAlertDialog.tsx b/wavefront/client/src/pages/apps/[appId]/datasources/ScheduleEmailAlertDialog.tsx index aefb1cb8..9c311140 100644 --- a/wavefront/client/src/pages/apps/[appId]/datasources/ScheduleEmailAlertDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/datasources/ScheduleEmailAlertDialog.tsx @@ -7,12 +7,26 @@ import { DialogHeader, DialogTitle, } from '@app/components/ui/dialog'; +import { Badge } from '@app/components/ui/badge'; +import { + Command, + CommandEmpty, + CommandGroup, + CommandInput, + CommandItem, + CommandList, +} from '@app/components/ui/command'; import { Input } from '@app/components/ui/input'; +import { Popover, PopoverContent, PopoverTrigger } from '@app/components/ui/popover'; import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@app/components/ui/select'; import { Textarea } from '@app/components/ui/textarea'; +import { useGetUsers } from '@app/hooks'; +import { cn } from '@app/lib/utils'; import { useNotifyStore } from '@app/store'; +import { IUser } from '@app/types/user'; import { ScheduledJob } from '@app/types/scheduled-job'; import floConsoleService from '@app/api'; +import { Check, ChevronDown, X } from 'lucide-react'; import { useEffect, useMemo, useState } from 'react'; interface ScheduleEmailAlertDialogProps { @@ -22,6 +36,28 @@ interface ScheduleEmailAlertDialogProps { onOpenChange: (open: boolean) => void; } +const normalizeUserId = (id: string) => id.trim().toLowerCase(); + +const formatUserLabel = (user: IUser) => `${user.first_name} ${user.last_name} (${user.email})`; + +const parseRecipientUserIdsFromPayload = (payload: Record, users: IUser[]): string[] => { + const rawIds = payload.recipient_user_ids; + const ids: string[] = []; + if (Array.isArray(rawIds)) { + for (const item of rawIds) { + const id = String(item).trim(); + if (id) ids.push(id); + } + } else if (typeof rawIds === 'string' && rawIds.trim()) { + ids.push(rawIds.trim()); + } + + return ids.map((id) => { + const match = users.find((u) => normalizeUserId(u.id) === normalizeUserId(id)); + return match ? match.id : id; + }); +}; + const ScheduleEmailAlertDialog: React.FC = ({ isOpen, datasourceId, @@ -33,7 +69,77 @@ const ScheduleEmailAlertDialog: React.FC = ({ const [loadingJobs, setLoadingJobs] = useState(false); const [cronExpr, setCronExpr] = useState('0 9 * * *'); const [timezone, setTimezone] = useState('Asia/Kolkata'); - const [recipientsText, setRecipientsText] = useState(''); + const [selectedRecipientUserIds, setSelectedRecipientUserIds] = useState([]); + const [recipientsSelectOpen, setRecipientsSelectOpen] = useState(false); + const { data: users = [], isLoading: usersLoading } = useGetUsers(); + + const selectedRecipientUsers = useMemo(() => { + return selectedRecipientUserIds.map((id) => { + const user = users.find((u) => normalizeUserId(u.id) === normalizeUserId(id)); + if (user) return user; + return { + id, + email: id, + first_name: 'Unknown', + last_name: 'user', + role: '', + } satisfies IUser; + }); + }, [selectedRecipientUserIds, users]); + + const isRecipientSelected = (userId: string) => + selectedRecipientUserIds.some((id) => normalizeUserId(id) === normalizeUserId(userId)); + + const toggleRecipientUser = (userId: string) => { + setSelectedRecipientUserIds((prev) => + isRecipientSelected(userId) + ? prev.filter((id) => normalizeUserId(id) !== normalizeUserId(userId)) + : [...prev, userId] + ); + }; + + const removeRecipientUser = (userId: string) => { + setSelectedRecipientUserIds((prev) => prev.filter((id) => normalizeUserId(id) !== normalizeUserId(userId))); + }; + + const applyJobToForm = (job: ScheduledJob) => { + setEditingJobId(job.id); + setCronExpr(job.cron_expr || '0 9 * * *'); + setTimezone(job.timezone || 'Asia/Kolkata'); + setMaxRetries(String(job.max_retries ?? 3)); + const payload = (job.payload || {}) as Record; + setSelectedRecipientUserIds(parseRecipientUserIdsFromPayload(payload, users)); + setSubject(typeof payload.subject === 'string' ? payload.subject : ''); + const paramsValue = payload.params; + const dateRangeValue = payload.date_range; + if ( + dateRangeValue === 'last_day' || + dateRangeValue === 'last_hour' || + dateRangeValue === 'last_7_days' || + dateRangeValue === 'last_30_days' + ) { + setDateRange(dateRangeValue); + } else { + setDateRange('none'); + } + setStartDateParamKey(typeof payload.start_date_param === 'string' ? payload.start_date_param : 'start_date'); + setEndDateParamKey(typeof payload.end_date_param === 'string' ? payload.end_date_param : 'end_date'); + if (paramsValue && typeof paramsValue === 'object' && !Array.isArray(paramsValue)) { + setQueryParamsJson(JSON.stringify(paramsValue, null, 2)); + } else { + setQueryParamsJson(''); + } + setError(''); + }; + + const getJobRecipientLabels = (job: ScheduledJob): string[] => { + const payload = (job.payload || {}) as Record; + const ids = parseRecipientUserIdsFromPayload(payload, users); + return ids.map((id) => { + const user = users.find((u) => normalizeUserId(u.id) === normalizeUserId(id)); + return user ? formatUserLabel(user) : id; + }); + }; const [subject, setSubject] = useState(''); const [queryParamsJson, setQueryParamsJson] = useState(''); const [dateRange, setDateRange] = useState<'none' | 'last_day' | 'last_hour' | 'last_7_days' | 'last_30_days'>( @@ -46,19 +152,10 @@ const ScheduleEmailAlertDialog: React.FC = ({ const [saving, setSaving] = useState(false); const [error, setError] = useState(''); - const recipients = useMemo( - () => - recipientsText - .split(',') - .map((email) => email.trim()) - .filter(Boolean), - [recipientsText] - ); - const resetForm = () => { setCronExpr('0 9 * * *'); setTimezone('Asia/Kolkata'); - setRecipientsText(''); + setSelectedRecipientUserIds([]); setSubject(''); setQueryParamsJson(''); setDateRange('none'); @@ -96,6 +193,19 @@ const ScheduleEmailAlertDialog: React.FC = ({ // eslint-disable-next-line react-hooks/exhaustive-deps }, [isOpen, datasourceId, queryId]); + useEffect(() => { + if (!editingJobId || usersLoading) return; + const job = jobs.find((j) => j.id === editingJobId); + if (!job) return; + setSelectedRecipientUserIds((prev) => { + if (prev.some((id) => users.some((u) => normalizeUserId(u.id) === normalizeUserId(id)))) { + return prev; + } + const parsedIds = parseRecipientUserIdsFromPayload((job.payload || {}) as Record, users); + return parsedIds.length > 0 ? parsedIds : prev; + }); + }, [editingJobId, jobs, users, usersLoading]); + const handleOpenChange = (open: boolean) => { if (!open && !saving) { resetForm(); @@ -113,8 +223,8 @@ const ScheduleEmailAlertDialog: React.FC = ({ setError('Timezone is required'); return; } - if (recipients.length === 0) { - setError('At least one recipient email is required'); + if (selectedRecipientUserIds.length === 0) { + setError('At least one recipient user is required'); return; } if (!Number.isInteger(retries) || retries < 0 || retries > 10) { @@ -147,7 +257,7 @@ const ScheduleEmailAlertDialog: React.FC = ({ payload: { datasource_id: datasourceId, query_id: queryId, - recipients, + recipient_user_ids: selectedRecipientUserIds, subject: subject.trim() || undefined, date_range: dateRange === 'none' ? undefined : dateRange, start_date_param: dateRange === 'none' ? undefined : startDateParamKey.trim() || 'start_date', @@ -156,6 +266,7 @@ const ScheduleEmailAlertDialog: React.FC = ({ }, }); notifySuccess('Schedule updated successfully'); + await fetchJobs(); } else { await floConsoleService.scheduledJobService.createScheduledJob({ job_type: 'email_dynamic_query', @@ -165,7 +276,7 @@ const ScheduleEmailAlertDialog: React.FC = ({ payload: { datasource_id: datasourceId, query_id: queryId, - recipients, + recipient_user_ids: selectedRecipientUserIds, subject: subject.trim() || undefined, date_range: dateRange === 'none' ? undefined : dateRange, start_date_param: dateRange === 'none' ? undefined : startDateParamKey.trim() || 'start_date', @@ -174,9 +285,9 @@ const ScheduleEmailAlertDialog: React.FC = ({ }, }); notifySuccess('Email alert scheduled successfully'); + resetForm(); + await fetchJobs(); } - resetForm(); - await fetchJobs(); } catch { setError('Unable to create schedule. Please verify the details and try again.'); } finally { @@ -185,33 +296,7 @@ const ScheduleEmailAlertDialog: React.FC = ({ }; const handleEdit = (job: ScheduledJob) => { - setEditingJobId(job.id); - setCronExpr(job.cron_expr || '0 9 * * *'); - setTimezone(job.timezone || 'Asia/Kolkata'); - setMaxRetries(String(job.max_retries ?? 3)); - const payload = (job.payload || {}) as Record; - const recipients = Array.isArray(payload.recipients) ? payload.recipients : []; - setRecipientsText(recipients.map((item) => String(item)).join(', ')); - setSubject(typeof payload.subject === 'string' ? payload.subject : ''); - const paramsValue = payload.params; - const dateRangeValue = payload.date_range; - if ( - dateRangeValue === 'last_day' || - dateRangeValue === 'last_hour' || - dateRangeValue === 'last_7_days' || - dateRangeValue === 'last_30_days' - ) { - setDateRange(dateRangeValue); - } else { - setDateRange('none'); - } - setStartDateParamKey(typeof payload.start_date_param === 'string' ? payload.start_date_param : 'start_date'); - setEndDateParamKey(typeof payload.end_date_param === 'string' ? payload.end_date_param : 'end_date'); - if (paramsValue && typeof paramsValue === 'object' && !Array.isArray(paramsValue)) { - setQueryParamsJson(JSON.stringify(paramsValue, null, 2)); - } else { - setQueryParamsJson(''); - } + applyJobToForm(job); }; const handleDelete = async (jobId: string) => { @@ -232,7 +317,7 @@ const ScheduleEmailAlertDialog: React.FC = ({ return ( - + Schedule Email Alert Create a scheduled query email for this dynamic query. @@ -261,7 +346,10 @@ const ScheduleEmailAlertDialog: React.FC = ({ {jobs.map((job) => (

@@ -274,6 +362,15 @@ const ScheduleEmailAlertDialog: React.FC = ({ Next Run:{' '} {job.next_run_at ? new Date(job.next_run_at).toLocaleString() : '-'}

+

+ Recipients:{' '} + {usersLoading + ? 'Loading...' + : (() => { + const labels = getJobRecipientLabels(job); + return labels.length > 0 ? labels.join(', ') : 'None'; + })()} +

-
+

Datasource ID

@@ -304,9 +401,13 @@ const ScheduleEmailAlertDialog: React.FC = ({

Query ID

+
+

Subject (optional)

+ setSubject(e.target.value)} placeholder="Daily report" /> +
-
+

Cron expression

setCronExpr(e.target.value)} placeholder="0 9 * * *" /> @@ -315,27 +416,82 @@ const ScheduleEmailAlertDialog: React.FC = ({

Timezone

setTimezone(e.target.value)} placeholder="Asia/Kolkata" />
-
- -

Max retries

setMaxRetries(e.target.value)} placeholder="3" />
-
-

Subject (optional)

- setSubject(e.target.value)} placeholder="Daily report" /> -
-

Recipients (comma-separated emails)

-