Gravitational Wave Treasure Map cron functions - a Kafka-based listener system for processing:
- LIGO/Virgo/KAGRA gravitational wave alerts
- IceCube neutrino coincidence notices
Requirements: Python 3.11+
The listeners are real-time streaming processors that:
-
Subscribe to NASA GCN Kafka streams
- LIGO Listener:
igwn.gwalerttopic for gravitational wave detections - IceCube Listener:
gcn.notices.icecube.lvk_nu_track_searchfor neutrino coincidences
- LIGO Listener:
-
Process each alert as it arrives (typically within seconds of detection):
- Parse alert JSON and extract metadata (event ID, classification, instruments)
- Decode and analyze probability skymaps (FITS format)
- Calculate sky localization statistics (90%/50% credible areas, average position)
- Generate derived products:
- Sky contours (GeoJSON for visualization)
- MOC (Multi-Order Coverage) files
- Satellite visibility maps (Fermi, LAT)
- Query galaxy catalogs to identify potential host galaxies
-
Store products to cloud storage (S3/Azure/OpenStack Swift):
- Raw alert JSON
- Processed FITS skymaps
- Visualization-ready contours and maps
- One event can produce 5-10 files as it evolves (Early Warning → Preliminary → Update)
-
POST to GWTM API for public consumption by astronomers worldwide
Important: Listeners only process new alerts that arrive after they start. Historical alerts are not backfilled. If you start with empty storage, it will remain empty until the next gravitational wave or neutrino detection is announced.
# Build and run both listeners
docker compose up
# Run specific listener
docker compose up ligo-listener
docker compose up icecube-listenerdocker build -t gwtm_cron .
docker tag gwtm_cron:latest ghcr.io/thetreasuremap/gwtm_cron:latest
docker push ghcr.io/thetreasuremap/gwtm_cron:latestImages are published to ghcr.io/thetreasuremap/gwtm_cron via GitHub Actions.
Listeners are deployed to Kubernetes via ArgoCD, managed from the
gwtm-deploy repo and using the
Helm chart in the gwtm repo
(gwtm-helm/values-listeners-prod.yaml).
- Merge changes to
master - Tag the release:
git tag v1.2.3 git push origin v1.2.3
- GitHub Actions builds and pushes
ghcr.io/thetreasuremap/gwtm_cron:1.2.3 - ArgoCD Image Updater detects the new semver tag and automatically updates the
Helm parameter
listeners.image.tagin thegwtmrepo, writing back togwtm-helm/.argocd-source-gwtm-listeners.yaml - ArgoCD syncs the
gwtm-listenersapplication and rolls out the new pods
No manual Helm or kubectl commands are needed for routine releases.
The following environment variables are required for Docker/Kubernetes/Helm deployments:
KAFKA_CLIENT_ID- GCN Kafka client ID for authenticationKAFKA_CLIENT_SECRET- GCN Kafka client secret
API_TOKEN- Authentication token for GWTM APIAPI_BASE- Base URL for GWTM API (e.g.,https://treasuremap.space/api/v0/)
Option 1: AWS S3
AWS_ACCESS_KEY_ID- AWS access keyAWS_SECRET_ACCESS_KEY- AWS secret keyAWS_DEFAULT_REGION- AWS region (default:us-east-2)AWS_BUCKET- S3 bucket name (default:gwtreasuremap)STORAGE_BUCKET_SOURCE=s3- Set tos3for AWS storage
Option 2: Azure Blob Storage
AZURE_ACCOUNT_NAME- Azure storage account nameAZURE_ACCOUNT_KEY- Azure storage account keySTORAGE_BUCKET_SOURCE=abfs- Set toabfsfor Azure storage
Option 3: OpenStack Swift
OS_AUTH_URL- OpenStack authentication endpoint (e.g.,https://openstack.example.com:5000/v3)OS_USERNAME- OpenStack usernameOS_PASSWORD- OpenStack passwordOS_PROJECT_NAME- OpenStack project/tenant nameOS_USER_DOMAIN_NAME- User domain name (default:Default)OS_PROJECT_DOMAIN_NAME- Project domain name (default:Default)OS_CONTAINER_NAME- Swift container name (default:gwtreasuremap)STORAGE_BUCKET_SOURCE=swift- Set toswiftfor OpenStack storage
OBSERVING_RUN- Observing run identifier (default:O4)PATH_TO_GALAXY_CATALOG_CONFIG- Path to galaxy catalog config file (only needed for LIGO listener if generating galaxy lists)
DRY_RUN- Set totrueor1to run in dry-run mode (no API calls, no storage writes)WRITE_TO_STORAGE- Set tofalseor0to disable storage writes (default:true)VERBOSE- Set tofalseor0to disable verbose logging (default:true)
Note: Storage type (S3, Azure, Swift) is controlled by STORAGE_BUCKET_SOURCE, not by WRITE_TO_STORAGE.
LOG_FORMAT- Set tojsonto enable structured JSON logging for Kubernetes (default: print statements)LOG_LEVEL- Set log level:DEBUG,INFO,WARNING,ERROR,CRITICAL(default:INFO)
AWS S3 Example:
env:
- name: KAFKA_CLIENT_ID
valueFrom:
secretKeyRef:
name: gwtm-secrets
key: kafka-client-id
- name: KAFKA_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: gwtm-secrets
key: kafka-client-secret
- name: API_TOKEN
valueFrom:
secretKeyRef:
name: gwtm-secrets
key: api-token
- name: API_BASE
value: "https://treasuremap.space/api/v0/"
- name: STORAGE_BUCKET_SOURCE
value: "s3"
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: secret-access-key
- name: AWS_DEFAULT_REGION
value: "us-east-2"
- name: AWS_BUCKET
value: "gwtreasuremap"
- name: OBSERVING_RUN
value: "O4"OpenStack Swift Example:
env:
- name: KAFKA_CLIENT_ID
valueFrom:
secretKeyRef:
name: gwtm-secrets
key: kafka-client-id
- name: KAFKA_CLIENT_SECRET
valueFrom:
secretKeyRef:
name: gwtm-secrets
key: kafka-client-secret
- name: API_TOKEN
valueFrom:
secretKeyRef:
name: gwtm-secrets
key: api-token
- name: API_BASE
value: "https://treasuremap.space/api/v0/"
- name: STORAGE_BUCKET_SOURCE
value: "swift"
- name: OS_AUTH_URL
value: "https://openstack.example.com:5000/v3"
- name: OS_USERNAME
valueFrom:
secretKeyRef:
name: openstack-credentials
key: username
- name: OS_PASSWORD
valueFrom:
secretKeyRef:
name: openstack-credentials
key: password
- name: OS_PROJECT_NAME
value: "gwtm-project"
- name: OS_USER_DOMAIN_NAME
value: "Default"
- name: OS_PROJECT_DOMAIN_NAME
value: "Default"
- name: OS_CONTAINER_NAME
value: "gwtreasuremap"
- name: OBSERVING_RUN
value: "O4"The system runs two independent listener processes:
-
LIGO Listener (
docker/run_ligo_listener.py)- Subscribes to
igwn.gwalertKafka topic - Processes gravitational wave alerts
- Generates skymaps, contours, and galaxy lists
- Posts to GWTM API
- Subscribes to
-
IceCube Listener (
docker/run_icecube_listener.py)- Subscribes to
gcn.notices.icecube.lvk_nu_track_searchKafka topic - Processes neutrino coincidence notices
- Posts to GWTM API
- Subscribes to
Both listeners run continuously and process alerts in real-time as they arrive on the Kafka stream.
When migrating between storage backends (e.g., moving from AWS to OpenStack):
# Dry run to see what would be migrated (includes size estimation)
python scripts/migrate_storage.py --source s3 --dest swift --container fit --dry-run
# Actual migration with progress tracking
python scripts/migrate_storage.py --source s3 --dest swift --container fit
# Migrate test data
python scripts/migrate_storage.py --source s3 --dest swift --container testMigration Features:
- Size Estimation: Calculates total data size before transfer (samples files if >100)
- Time Estimation: Shows ETA and transfer rate during migration
- Progress Tracking: Real-time progress with percentage complete
- Transfer Statistics: Reports total size, time, and average transfer rate
- Error Handling: Continues on errors and reports failed files at end
- Dry-run Mode: Preview migration without transferring data
Example Output:
Scanning source (s3)...
Found 1523 files. Calculating total size...
Total size: ~4.23 GB (1523 files)
--------------------------------------------------------------------------------
[1/1523 - 0.1%] fit/S230518h-Preliminary.fits.gz
Size: 2.45 MB, Time: 1.2s, Rate: 2.04 MB/s, ETA: 30.5m
[2/1523 - 0.1%] fit/S230518h-contours-smooth.json (145.23 KB) ✓
...
[1523/1523 - 100.0%] test/MS181101ab-retraction.json (8.12 KB) ✓
================================================================================
Migration complete!
Successful: 1523/1523
Total size: 4.23 GB
Total time: 28.3m
Avg rate: 2.56 MB/s
# Run local ingestion tests with sample alerts
python tests/listener_tests/test_local_ingest.py
python tests/icecube_tests/test_local_ingest.py