diff --git a/.gitignore b/.gitignore
index 0f984e7..889e9b9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,10 @@
# dynamic package version
pcs/_version.py
+# Docker build dependencies
+
+docker/rfsoc_controller/ccatkidlib
+
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
diff --git a/Dockerfile b/Dockerfile
index e4ebbfa..2e72728 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -7,7 +7,8 @@ FROM simonsobs/ocs:v0.11.3-19-gd729e04
# Install addition network related packages for ACU interface agent
RUN apt-get update -y && apt-get install -y iputils-ping \
- curl
+ curl \
+ rsync
# Copy in and install requirements
COPY requirements.txt /app/pcs/requirements.txt
diff --git a/docker/rfsoc_controller/Dockerfile b/docker/rfsoc_controller/Dockerfile
new file mode 100644
index 0000000..2f310d7
--- /dev/null
+++ b/docker/rfsoc_controller/Dockerfile
@@ -0,0 +1,13 @@
+FROM pcs
+
+# Will eventually want to pip install ccatkidlib from github once made public
+WORKDIR /app/pcs/
+RUN git clone https://github.com/ccatobs/ccatkidlib.git
+
+RUN python -m pip install -e ./ccatkidlib && \
+ python -m pip install -r ./ccatkidlib/requirements.txt && \
+ python -m pip install numpy==2.0.2
+
+WORKDIR /
+
+ENTRYPOINT ["dumb-init", "ocs-agent-cli"]
diff --git a/docs/agents/coldload_scpipsu.rst b/docs/agents/coldload_scpipsu.rst
new file mode 100644
index 0000000..c1da277
--- /dev/null
+++ b/docs/agents/coldload_scpipsu.rst
@@ -0,0 +1,114 @@
+.. highlight:: rst
+
+.. _coldload_scpipsu:
+
+========
+Coldload
+========
+
+The Coldload agent controls and monitors the temperature of a coldload. The coldload therommeter is read out using a Lakeshore 240/372/etc.
+through the associated Lakeshore agent. The temperature of the coldload is controlled by varying the current supplied through a Standard Commands for Programmable Instruments (SCPI) power supply unit.
+Communication with the power supply unit can be done through direct Ethernet connection (if availabile) or can be mediated through GPIB (e.g, using a Prologix Interface).
+
+.. argparse::
+ :filename: ../pcs/agents/coldload_scpipsu/agent.py
+ :func: add_agent_args
+ :prog: python3 agent.py
+
+Configuration File Examples
+---------------------------
+
+Below are configuration examples for the SO-OCS site config file
+and docker-compose file for running the
+Agent in a docker container.
+
+OCS Site Config
+```````````````
+
+To run the Coldload agent, a RaritanAgent block must be added
+to the site config file. Here is an example configuration block with
+all available arguments::
+
+ {'agent-class': 'ColdloadAgent_ScpiPsu',
+ 'instance-id': 'power-psu-coldload',
+ 'manage': 'docker',
+ 'arguments': [
+ ['--ip-address', '10.10.10.50'],
+ ['--gpib-slot', '15'],
+ ['--psu-channel', 1],
+ ['--lakeshore', ['cryo-ls240-lsa291f', 'Channel_4']],
+ ['--max-current', 0.6]
+ ]},
+
+The ``--ip-address`` argument should be changed to the IP address of the BK Precision power supply on the network.
+The ``--gpib-slot`` argument should be changed to the GPIB port if using a Prologix Interface for communication.
+The ``--port`` argument should be added if communicating through Ethernet directly.
+The ``--psu-channel`` argument should be changed to the power supply channel connected to the coldload.
+The ``--lakeshore`` argument should be a list with two elements: The instance-id of the Lakeshore agent and the coldload thermometer channel.
+The ``-max-current`` argument specifies the maximum current that will be supplied by the power supply. The default is shown above.
+
+Docker Compose
+``````````````
+The Coldload agent should be configured to run in a Docker container. An
+example docker compose service configuration is shown here::
+
+ ocs-coldload:
+ image: ghcr.io/ccatobs/pcs:latest
+ <<: *log-options
+ hostname: ocs-docker
+ network_mode: "host"
+ environment:
+ - INSTANCE_ID=power-psu-coldload
+ - SITE_HUB=ws://192.168.24.55:8001/ws
+ - SITE_HTTP=http://192.168.24.55:8001/call
+ volumes:
+ - ${OCS_CONFIG_DIR}:/config:ro
+
+Description
+-----------
+
+A "coldload" is an approximate blackbody that is used cryogenically as a calibration source. The coldload used for Mod-Cam is an aluminum plate coated with epoxy.
+The temperature of the coldload is controlled by varying the current supplied by a BK Precision 9130B power supply through resistors mounted on the coldload. The
+temperature is read out using a Lakeshore LS240. The Coldload agent subclasses the Simon's Observatory SOCS `ScpiPsu agent `_ for control over the power supply
+but limits control to only the power supply channel connected to the coldload. Additionally, the Coldload agent subscribes to the Lakeshore agent feed monitoring the coldload temperature.
+The Coldload agent's main functionality is controlling the power supply (turning the channel on/off and getting/setting the voltage/current), but subscribing to the temperature feed also allows
+getting the temperature of the coldload (get_temp()) and setting the temperature of the coldload using a PID controller (set_temp()). When setting the temperature of the coldload, the PID values
+(error, integral of error, and derivative of error) as well as the current are continously published to the 'pid_output' OCS feed for monitoring. Finally, the Coldload agent exposes the serial read(), write()
+commands to allow for greater control over the power supply unit.
+
+Example Clients
+---------------
+
+Below is an example client to control outlets::
+
+ from ocs.ocs_client import OCSClient
+ client = OCSClient('power-psu-coldload')
+
+ # Get channel output
+ client.get_output()
+
+ # Set channel output
+ client.set_output(state='on')
+ client.set_output(state='off')
+
+ # Get/set voltage
+ client.get_voltage()
+ client.set_voltage(volts=15) # Volts
+
+ # Get/set current
+ client.get_current()
+ client.set_current(current=0.1) # Amps
+
+ # Get/set temperature
+ client.get_temp()
+ client.set_temp(temp=65, max_current=1, pid=[2.25e-3, 5.1e-7, 0.71]) # Kelvin
+
+ # Read/write serial commands
+ client.write(msg='*idn?')
+ client.read()
+
+Agent API
+---------
+
+.. autoclass:: pcs.agents.coldload_scpipsu.agent.ColdloadAgent_ScpiPsu
+ :members:
\ No newline at end of file
diff --git a/pcs/agents/beam_mapper/__init__.py b/pcs/agents/beam_mapper/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pcs/agents/beam_mapper/agent.py b/pcs/agents/beam_mapper/agent.py
new file mode 100644
index 0000000..e69de29
diff --git a/pcs/agents/coldload_scpipsu/__init__.py b/pcs/agents/coldload_scpipsu/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pcs/agents/coldload_scpipsu/agent.py b/pcs/agents/coldload_scpipsu/agent.py
new file mode 100644
index 0000000..0c709ff
--- /dev/null
+++ b/pcs/agents/coldload_scpipsu/agent.py
@@ -0,0 +1,316 @@
+import argparse
+import time
+import os
+
+import txaio
+from ocs import ocs_agent, site_config
+from socs.agents.scpi_psu.agent import ScpiPsuAgent
+
+from pcs.drivers.coldload import Coldload
+
+
+class ColdloadAgent_ScpiPsu(ScpiPsuAgent):
+ def __init__(self, agent, ip_address, gpib_slot=None, port=None, lakeshore=None, psu_channel=None, max_current=None):
+ # Initialize ScpiPsuAgent
+ super().__init__(agent, ip_address, gpib_slot=gpib_slot, port=port)
+
+ # Define additional attributes
+ self.psu_channel = psu_channel
+ self.temp_control = False
+ self.max_current = max_current
+
+ # Create coldload object
+ self.cl = Coldload(lakeshore[0], lakeshore[1])
+ self.err_i = 0.0 # Store integral error of set_temp PID in case control loop is interrupted
+
+ self.log = agent.log
+ # Register OCS feed to log PID temperature control parameters
+ self.agent.register_feed('pid_output',
+ record=True,
+ agg_params={'frame_length':10*60},
+ buffer_time=5)
+
+ #==================#
+ # Coldload Methods #
+ #==================#
+
+ def get_temp(self, session, params):
+ '''get_temp()
+
+ **Task** - Get the current coldload temperature.
+
+ '''
+ with self.lock.acquire_timeout(timeout=5, job='get_temp') as acquired:
+ if not acquired:
+ self.log.error(f'Lock could not be acquired because it is held by {self.lock.job}.')
+ return False, f"Lock could not be acquired because it is held by {self.lock.job}."
+
+ temp = self.cl.get_temp()
+ data = {'timestamp': time.time(),
+ 'block_name': 'coldload',
+ 'data': {'temp': temp}}
+ session.data = data
+
+ return temp is not None, temp
+
+ @ocs_agent.param('temp', type=float, check=lambda x: 60 <= x <= 200)
+ @ocs_agent.param('sample_int', type=float, default = 0.5)
+ @ocs_agent.param('avg_int', type=float, default = 7.5)
+ @ocs_agent.param('thresholds', type=list, default=[0.01, 0.1, 1, 5])
+ @ocs_agent.param('lock_int', type=float, default=0.1)
+ @ocs_agent.param('timeout', type=float, default=180)
+ @ocs_agent.param('max_current', type=float, default=None)
+ @ocs_agent.param('init_pid', type=list, default=[1e-3, 1.75e-7, 0.8])
+ @ocs_agent.param('stable_pid', type=list, default=[1e-3, 1.75e-7, 0.8])
+ @ocs_agent.param('ID_threshold', type=float, default=0.1)
+ @ocs_agent.param('reset_int', type=bool, default=True)
+ @ocs_agent.param('reset_current', type=bool, default=False)
+ def set_temp(self, session, params):
+ """
+ **Process** - Set the temperature of the coldload using a proportional integral derivative (PID) controller.
+ The PID controller uses the coldload temperature as the process variable and the current squared as the control variable.
+
+ Parameters:
+ temp (float): Temperature to set coldload to
+ sample_int (float): Interval at which to sample coldload temperature
+ avg_int (float): Interval over which to average coldload temperatures (averaged temperature used as PID process variable). Also sets timescale for PID control
+ thresholds (List(float)): Error thresholds at which to modify avg_int. avg_int will be used for errors greater than the largest threshold and then multiplied by 2 for each threshold passed.
+ lock_int (float): Interval at which to release lock
+ timeout (float): Time in minutes after which to exit PID loop (0 for indefinite)
+ max_current (float): Maximum current limit
+ pid (List[float]): Proportional, integral, and derivative control coefficients
+ int_threshold (float): Error threshold hold after which the integral term will start contributing to the PID control.
+ """
+
+ temp = params.pop('temp')
+
+ lock_int = params.pop('lock_int')
+ if params['max_current'] is None: params['max_current'] = self.max_current
+
+ with self.lock.acquire_timeout(timeout=1, job='set_temp') as acquired:
+ if not acquired:
+ self.log.error(f"Lock could not be acquired because it is held by {self.lock.job}.")
+ return False, f"Lock could not be acquired because it is held by {self.lock.job}."
+
+
+ last_release = time.time()
+ curr_args = [self.psu_channel]
+ params['yield_dict'] = True
+ params['err_i'] = 0.0 if params['reset_int'] else self.err_i
+ pid_control = self.cl.set_temp(temp, self.psu.get_curr, self.psu.set_curr, *curr_args, **params)
+ self.temp_control = True
+ while self.temp_control:
+ # Perform PID control loop and get PID error values
+ try:
+ pids = next(pid_control)
+ # Create data dictionary and publish to pid_output feed and session.data
+ if pids is not None:
+ data = {'timestamp': time.time(),
+ 'block_name': 'coldload',
+ 'data': pids}
+ self.agent.publish_to_feed('pid_output', data)
+ session.data = data
+ self.err_i = pids['err_i']
+
+ # Release and reacquire the lock
+ if time.time() - last_release > lock_int:
+ last_release = time.time()
+ if not self.lock.release_and_acquire(timeout=120):
+ self.log.error(f'Could not re-acquire lock now held by {self.lock.job}.')
+ return False, 'Could not re-acquire lock.'
+ # Catch exception raised if set_temp timeout is reached
+ except StopIteration:
+ self.temp_control = False
+ if params['reset_current']: self.psu.set_curr(self.psu_channel, 0.0)
+ return True, 'set_temp executed successfully.'
+
+ def stop_set_temp(self, session, params):
+ """stop_set_temp()
+
+ **Process** - Stop the process setting the coldload temperature. Called when running set_temp.stop()
+
+ """
+ if self.temp_control:
+ self.temp_control = False
+ return True, 'Stopping setting coldload temperature...'
+ else:
+ return False, 'Not currently setting coldload temperature.'
+
+ #===============================#
+ # Overload ScpiPsuAgent Methods #
+ #===============================#
+
+ def get_output(self, session, params):
+ """get_output()
+
+ **Task** - Get whether the channel connected to the coldload is on or off.
+
+ """
+ params['channel'] = self.psu_channel
+ return super().get_output(session, params=params)
+
+ def get_voltage(self, session, params):
+ """get_voltage()
+
+ **Task** - Get the voltage of the coldload.
+
+ """
+ params['channel'] = self.psu_channel
+ return super().get_voltage(session, params=params)
+
+ def get_current(self, session, params):
+ """get_current()
+
+ **Task** - Get the current of the coldload.
+
+ """
+ params['channel'] = self.psu_channel
+ return super().get_current(session, params=params)
+
+ @ocs_agent.param('state', type=bool)
+ def set_output(self, session, params):
+ """set_output(state)
+
+ **Task** - Turn the channel connected to the coldload on or off.
+
+ Parameters:
+ state (bool): True for on, False for off.
+ """
+
+ params['channel'] = self.psu_channel
+ return super().set_output(session, params=params)
+
+ @ocs_agent.param('volts', type=float, check=lambda x: 0 <= x <= 30)
+ def set_voltage(self, session, params):
+ """set_voltage(volts)
+
+ **Task** - Set the voltage of the coldload.
+
+ Parameters:
+ volts (float): Voltage to set.
+ """
+
+ params['channel'] = self.psu_channel
+ return super().set_voltage(session, params=params)
+
+ @ocs_agent.param('current', type=float)
+ def set_current(self, session, params):
+ """set_current(current)
+
+ **Task** - Set the current of the coldload.
+
+ Parameters:
+ current (float): Current to set.
+ """
+
+ # Override set_current method to use power supply channel connected to coldload and to limit the max current.
+ params['channel'] = self.psu_channel
+ params['current'] = max(min(params['current'], self.max_current), 0)
+ return super().set_current(session, params=params)
+
+ #============================================#
+ # Read/Write Serial Commands to Power Supply #
+ #============================================#
+
+ def read(self, session, params):
+ """read()
+
+ **Task** - Read message from power supply
+
+ """
+
+ with self.lock.acquire_timeout(timeout=5, job='read') as acquired:
+ if not acquired:
+ self.log.error(f'Lock could not be acquired because it is held by {self.lock.job}.')
+ return False, f"Lock could not be acquired because it is held by {self.lock.job}."
+
+ resp = self.psu.read()
+ data = {'timestamp': time.time(),
+ 'block_name': 'power_supply',
+ 'data': {'read': resp}}
+ session.data = data
+ return True, resp
+
+ @ocs_agent.param('msg', type=str, default='')
+ def write(self, session, params):
+ """write(msg)
+
+ **Task** - Write serial command to power supply.
+
+ Parameters:
+ msg (str): Serial command
+ """
+ with self.lock.acquire_timeout(timeout=5, job='write') as acquired:
+ if not acquired:
+ self.log.error(f'Lock could not be acquired because it is held by {self.lock.job}.')
+ return False, f"Lock could not be acquired because it is held by {self.lock.job}."
+
+ msg = params['msg']
+ if not msg:
+ return False, f"Invalid message: {msg}"
+ else:
+ self.psu.write(msg)
+ return True, "Wrote message to power supply."
+
+#===========#
+# Functions #
+#===========#
+
+def make_parser(parser=None):
+ """Build the argument parser for the Agent. Allows sphinx to automatically
+ build documentation based on this function.
+
+ """
+ # From simonsobs/socs/socs/agents/scpi_psu/agent.py with additions
+
+ if parser is None:
+ parser = argparse.ArgumentParser()
+
+ # Add options specific to this agent.
+ pgroup = parser.add_argument_group('Agent Options')
+ pgroup.add_argument('--ip-address')
+ pgroup.add_argument('--gpib-slot')
+ pgroup.add_argument('--port')
+ pgroup.add_argument('--psu-channel', type=int, help='The power supply channel connected to the coldload.')
+ pgroup.add_argument('--lakeshore', nargs=2, type=str, help='Instance ID of lakeshore agent and thermometer channel of the coldload.', metavar = ('Lakeshore Agent Instance ID', 'Lakeshore Thermometer Channel of Coldload'))
+ pgroup.add_argument('--mode', type=str, default='init',
+ choices=['init', 'acq'])
+ pgroup.add_argument('--max-current', type=float, default=1, help='Maximum current limit in Amperes.')
+ return parser
+
+def main(args=None):
+ # From simonsobs/socs/socs/agents/scpi_psu/agent.py with modifications
+
+ # Start logging
+ txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))
+
+ parser = make_parser()
+ args = site_config.parse_args(agent_class='ColdloadAgent',
+ parser=parser,
+ args=args)
+
+ init_params = {'auto_acquire': args.mode == 'acq'}
+ agent, runner = ocs_agent.init_site_agent(args)
+
+ c = ColdloadAgent_ScpiPsu(agent, args.ip_address, gpib_slot=args.gpib_slot, port=args.port, psu_channel = args.psu_channel, lakeshore = args.lakeshore, max_current=args.max_current)
+
+ agent.register_task('init', c.init, startup=init_params)
+ agent.register_task('set_voltage', c.set_voltage)
+ agent.register_task('set_current', c.set_current)
+ agent.register_task('set_output', c.set_output)
+
+ agent.register_task('get_voltage', c.get_voltage)
+ agent.register_task('get_current', c.get_current)
+ agent.register_task('get_temp', c.get_temp)
+ agent.register_task('get_output', c.get_output)
+
+ agent.register_task('read', c.read)
+ agent.register_task('write', c.write)
+
+ agent.register_process('monitor_output', c.monitor_output, c.stop_monitoring)
+ agent.register_process('set_temp', c.set_temp, c.stop_set_temp)
+
+ runner.run(agent, auto_reconnect=True)
+
+if __name__ == '__main__':
+ main()
diff --git a/pcs/agents/fts_aerotech/__init__.py b/pcs/agents/fts_aerotech/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pcs/agents/fts_aerotech/agent.py b/pcs/agents/fts_aerotech/agent.py
new file mode 100644
index 0000000..ae15683
--- /dev/null
+++ b/pcs/agents/fts_aerotech/agent.py
@@ -0,0 +1,361 @@
+
+import argparse
+import os
+import socket
+import time
+
+import txaio
+import yaml
+from ocs import ocs_agent, site_config
+from ocs.ocs_twisted import Pacemaker, TimeoutLock
+from twisted.internet import reactor
+
+# Modified version of the SOCS FTS Aerotech agent
+# Updated to log current position of central mirror instead of the target position
+
+class FTSAerotechStage:
+ """
+ Class for connecting to the FTS mirror controller
+
+ Args:
+ ip_address: IP address where controller is running
+ port: Port the controller is listening on
+ timeout: communication timeout
+ speed: speed in mm/s, defaults to 25 mm/s if None
+ translate: Argument which translates the aerotech stage readout into
+ a standardized value.
+ limits: 2-tuple of the max and min FTS central mirror positions to
+ prevent the stage from moving out of bounds via software
+ limits.
+
+ """
+
+ def __init__(self, ip_address, port, translate=None, limits=None,
+ timeout=10, speed=25):
+ self.ip_address = ip_address
+ self.port = int(port)
+
+ self.comm = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.comm.connect((self.ip_address, self.port))
+ self.comm.settimeout(timeout)
+
+ self.send('ENABLE X\n') # Send enable command
+ data = self.comm.recv(1024) # Collect and print response
+
+ if data == b'%\n':
+ self.initialized = True
+ else:
+ self.initialized = False
+
+ self.pos = None
+ self.speed = speed
+ self.speed_code = 'F%i' % self.speed
+
+ self.translate = translate
+ self.limits = limits
+
+ def send(self, msg):
+ self.comm.sendall(bytes(msg, 'utf-8'))
+
+ def read(self):
+ # Controller blocks reply until motion is complete; so if
+ # you're putting a timeout in here make sure it's smart/long
+ # enough.
+ return self.comm.recv(1024)
+
+ def home(self):
+ self.send('HOME X\n')
+ time.sleep(0.1)
+ # block until homing is complete
+ return self.read()
+
+ def get_position(self):
+ self.send('PFBK X\n')
+ time.sleep(0.1)
+ out = self.read()
+ try:
+ M, B = self.translate
+ self.pos = (float(out[1:]) - B) / M
+ return True, self.pos
+ except BaseException:
+ return False, None
+
+ def move_to(self, position):
+ limits = self.limits
+ if position < limits[0] or position > limits[1]:
+ return False, 'Move out of bounds!'
+ M, B = self.translate
+ stage_pos = position * M + B
+ cmd = ('MOVEABS X%.2f %s\n' % (stage_pos, self.speed_code))
+ self.send(cmd)
+ out = None
+ while out is None:
+ try:
+ # controller should block reply until move complete.
+ time.sleep(0.1)
+ out = self.read()
+ except TimeoutError:
+ continue
+ return True, 'Move Complete'
+
+ def close(self):
+ self.comm.close()
+
+
+class FTSAerotechAgent:
+ """
+ Agent for connecting to the FTS mirror control.
+
+ Args:
+ ip_addr: IP address of Motion Controller
+ port: Port of Motion Controller
+ mode: 'acq': Start data acquisition on initialize
+ samp: default sampling frequency in Hz
+ config_file: File which contains FTS-specific translate, limits, and
+ speed arguments.
+
+ """
+
+ def __init__(self, agent, ip_addr, port, config_file, mode=None, samp=2):
+
+ self.ip_addr = ip_addr
+ self.port = int(port)
+
+ self.stage = None
+ self.initialized = False
+ self.take_data = False
+
+ self.agent = agent
+ self.log = agent.log
+ self.lock = TimeoutLock()
+
+ if mode == 'acq':
+ self.auto_acq = True
+ else:
+ self.auto_acq = False
+ self.sampling_frequency = float(samp)
+
+ # register the position feeds
+ agg_params = {
+ 'frame_length': 10 * 60, # [sec]
+ }
+
+ self.agent.register_feed('position',
+ record=True,
+ agg_params=agg_params,
+ buffer_time=0)
+
+ # Load dictionary of specific mirror paramters, since some parameters
+ # like limits and translate vary over different FTSes This is loaded
+ # from a yaml file, which is assumed to be in the $OCS_CONFIG_DIR
+ # directory.
+ if config_file is None:
+ raise Exception(
+ "No config file specified for the FTS mirror config")
+ else:
+ config_file_path = os.path.join(os.environ['OCS_CONFIG_DIR'],
+ config_file)
+ with open(config_file_path) as stream:
+ self.mirror_configs = yaml.safe_load(stream)
+ if self.mirror_configs is None:
+ raise Exception("No mirror configs in config file.")
+ self.log.info(
+ f"Loaded mirror configs from file {config_file_path}")
+ self.translate = self.mirror_configs.pop('translate', None)
+ self.limits = self.mirror_configs.pop('limits', None)
+ # The other mirror configs (speed, timeout) are optional and
+ # have defaults so we leave them as the dictionary.
+ if self.translate is None or self.limits is None:
+ raise Exception("translate and limits must be included "
+ "in the mirror configuration keys")
+
+ @ocs_agent.param('_')
+ def init_stage(self, session, params=None):
+ """init_stage()
+
+ **Task** - Perform first time setup for communication with FTS stage.
+
+ """
+ if self.stage is not None and self.initialized:
+ return True, 'Stages already Initialized'
+
+ self.log.debug("Trying to acquire lock")
+ with self.lock.acquire_timeout(timeout=0, job='init') as acquired:
+ # Locking mechanism stops code from proceeding if no lock acquired
+ if not acquired:
+ self.log.warn("Could not start init because {} is already"
+ "running".format(self.lock.job))
+ return False, "Could not acquire lock."
+ # Run the function you want to run
+ self.log.debug("Lock Acquired Connecting to Stages")
+ try:
+ self.stage = FTSAerotechStage(
+ self.ip_addr, self.port, self.translate, self.limits,
+ **self.mirror_configs)
+ except Exception as e:
+ self.log.error(f"Error while connecting to FTS: {e}")
+ reactor.callFromThread(reactor.stop)
+ return False, "FTS Stage Initialization Failed"
+ # This part is for the record and to allow future calls to proceed,
+ # so does not require the lock
+ self.initialized = True
+ if self.auto_acq:
+ self.agent.start('acq', params={'sampling_frequency': self.sampling_frequency})
+ return True, 'Stage Initialized.'
+
+ @ocs_agent.param('_')
+ def home(self, session, params=None):
+ """home()
+
+ **Task** - Home the stage to its negative limit.
+
+ """
+
+ with self.lock.acquire_timeout(timeout=5, job='home') as acquired:
+ if not acquired:
+ self.log.warn("Could not start home because lock held by"
+ f"{self.lock.job}")
+ return False, "Could not get lock"
+ try:
+ self.stage.home()
+ except Exception as e:
+ self.log.error(f"Homing Failed: {e}")
+ return False, "Homing Failed"
+ return True, "Homing Complete"
+
+ @ocs_agent.param('position', type=float, check=lambda x: -74.8 <= x <= 74.8)
+ def move_to(self, session, params=None):
+ """move_to(position)
+
+ **Task** - Move to absolute position relative to stage center (in mm).
+
+ Parameters:
+ position (float): Position in mm, must be between -74.8 and 74.8.
+
+ """
+ with self.lock.acquire_timeout(timeout=5, job='move') as acquired:
+ if not acquired:
+ self.log.warn("Could not start move because lock held by"
+ f"{self.lock.job}")
+ return False, "Could not get lock"
+ return self.stage.move_to(params.get('position'))
+
+ return False, "Move did not complete correctly?"
+
+ @ocs_agent.param('sampling_frequency', type=float, default=2)
+ def acq(self, session, params=None):
+ """acq(sampling_frequency=2)
+
+ Parameters:
+ sampling_frequency (float): Sampling rate in Hz. Defaults to 2 Hz.
+
+ Notes:
+ The most recent position data is stored in session.data in the
+ format::
+
+ >>> response.session['data']
+ {"position": {"pos" : mirror position}}
+
+ """
+ f_sample = params.get('sampling_frequency', self.sampling_frequency)
+ pm = Pacemaker(f_sample, quantize=True)
+
+ if not self.initialized or self.stage is None:
+ raise Exception("Connection to Stages not initialized")
+
+ with self.lock.acquire_timeout(timeout=0, job='acq') as acquired:
+ if not acquired:
+ self.log.warn(f"Could not start acq because {self.lock.job} "
+ "is already running")
+ return False, "Could not acquire lock."
+
+ self.log.info("Starting Data Acquisition for FTS Mirror at"
+ f"{f_sample} Hz")
+ self.take_data = True
+ last_release = time.time()
+ session.data = {"position": 0}
+
+ while self.take_data:
+ if time.time() - last_release > 0.1:
+ if not self.lock.release_and_acquire(timeout=120):
+ self.log.warn("Could not re-acquire lock now held by"
+ f"{self.lock.job}.")
+ return False, "could not re-acquire lock"
+ last_release = time.time()
+ pm.sleep()
+
+ data = {
+ 'timestamp': time.time(),
+ 'block_name': 'position',
+ 'data': {}}
+ success, pos = self.stage.get_position()
+ if not success:
+ self.log.info("stage.get_position call failed")
+ else:
+ data['data']['pos'] = pos
+ session.data["position"]=pos
+ self.agent.publish_to_feed('position', data)
+
+ return True, 'Acquisition exited cleanly.'
+
+ def _stop_acq(self, session, params=None):
+ """
+ params:
+ dict: {}
+ """
+ if self.take_data:
+ self.take_data = False
+ return True, 'requested to stop taking data.'
+ else:
+ return False, 'acq is not currently running.'
+
+
+def make_parser(parser=None):
+ """Build the argument parser for the Agent. Allows sphinx to automatically
+ build documentation based on this function.
+ """
+ if parser is None:
+ parser = argparse.ArgumentParser()
+
+ # Add options specific to this agent.
+ pgroup = parser.add_argument_group('Agent Options')
+ pgroup.add_argument('--ip-address')
+ pgroup.add_argument('--port')
+ pgroup.add_argument('--config-file')
+ pgroup.add_argument('--mode')
+ pgroup.add_argument('--sampling_frequency')
+ return parser
+
+
+def main(args=None):
+ # For logging
+ txaio.use_twisted()
+ txaio.make_logger()
+
+ # Start logging
+ txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))
+
+ parser = make_parser()
+
+ # Interpret options in the context of site_config.
+ args = site_config.parse_args(agent_class='FTSAerotechAgent',
+ parser=parser,
+ args=args)
+
+ agent, runner = ocs_agent.init_site_agent(args)
+
+ fts_agent = FTSAerotechAgent(agent, args.ip_address, args.port,
+ args.config_file, args.mode,
+ args.sampling_frequency)
+
+ agent.register_task('init_stage', fts_agent.init_stage)
+ agent.register_task('move_to', fts_agent.move_to)
+ agent.register_task('home', fts_agent.home)
+
+ agent.register_process('acq', fts_agent.acq, fts_agent._stop_acq)
+
+ runner.run(agent, auto_reconnect=True)
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/pcs/agents/primecam_bias/__init__.py b/pcs/agents/primecam_bias/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pcs/agents/primecam_bias/agent.py b/pcs/agents/primecam_bias/agent.py
new file mode 100644
index 0000000..f31aff5
--- /dev/null
+++ b/pcs/agents/primecam_bias/agent.py
@@ -0,0 +1,446 @@
+import argparse
+import time
+import os
+
+import txaio
+from ocs.ocs_twisted import TimeoutLock
+from ocs import ocs_agent, site_config
+
+from pcs.drivers.bias_crate import BiasCrate
+
+class PrimecamBiasAgent():
+ def __init__(self, agent,
+ host = None,
+ port = None,
+ pub_chan = None,
+ sub_chan = None,
+ timeout = None,
+ max_current = None,
+ max_voltage = None):
+ self.agent = agent
+ self.log = agent.log
+ self.lock = TimeoutLock()
+
+ self.host, self.port = host, port
+ self.pub_chan, self.sub_chan = pub_chan, sub_chan
+ self.timeout = timeout
+ self.max_current, self.max_voltage = max_current, max_voltage
+
+ self.agent.register_feed('bias_crate_output',
+ record=True,
+ agg_params={'frame_length':10*60},
+ buffer_time=5)
+
+ @ocs_agent.param('auto_acquire', default=False, type=bool)
+ @ocs_agent.param('poll_int', type=float, default=10)
+ @ocs_agent.param('lock_int', type=float, default=0.1)
+ def init_crate(self, session, params):
+ with self.lock.acquire_timeout(timeout=10, job='init_crate') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ self.bias_crate = BiasCrate(self.host, self.port, self.pub_chan, self.sub_chan, timeout = self.timeout)
+ self.bias_cards = self.bias_crate.get_bias_cards()['cards']
+ self.log.info(f'Created Prime-Cam bias crate control object. Available bias cards are: {self.bias_cards}')
+
+ if params.pop('auto_acquire'):
+ self.agent.start('monitor_channel', params)
+ return True, f'Active Bias Cards: {self.bias_cards}'
+
+ # Bias crate monitoring
+ # =====================
+ @ocs_agent.param('poll_int', type=float, default=10)
+ @ocs_agent.param('lock_int', type=float, default=0.1)
+ @ocs_agent.param('com_to', default=None)
+ def monitor_channel(self, session, params):
+ """monitor_channel(wait=1, com_to=['1.1']
+
+ **Process** - Continuously monitor bias crate output current and voltage of the specified channel(s)
+
+ Parameters:
+ wait (float, optional): Time to wait between measurements [seconds].
+ """
+ with self.lock.acquire_timeout(timeout=10, job='monitor_channel') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str):
+ com_tos = [com_tos]
+ elif com_tos is None:
+ com_tos = [f'{card}.{i+1}' for card in self.bias_cards for i in range(8)]
+
+ last_poll = time.time()
+ last_release = last_poll
+ poll_int, lock_int = params['poll_int'], params['lock_int']
+ self.monitor = True
+ while self.monitor:
+ if time.time() - last_poll > poll_int:
+ last_poll = time.time()
+
+ resps = {}
+ for com_to in com_tos:
+ card, channel = map(int, com_to.split('.'))
+ resp = self.bias_crate.get_status(card, channel)
+ if resp['status'] == 'success':
+ status = resp
+ # Automatically disable output if current or voltage exceed max values specified
+ if status['current'] > self.max_current or status['vbus'] > self.max_voltage:
+ self.agent.start('disable_output', {'com_to': com_to})
+ self.log.warn(f'Disabling output of Card {card} Channel {channel} since it exceeds specified voltage and/or current limit!')
+ else:
+ self.log.error(f"Failed to monitor status for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ status = {'card': card, 'channel': channel, 'vbus': None, 'vshunt': None, 'current': None, 'outputEnabled': None, 'wiper': None}
+ resps[com_to] = status
+
+ # Convert outputEnabled boolean to integer for easier visualization in Grafana
+ if (curr_output := status['outputEnabled']) is not None: status['outputEnabled'] = int(curr_output)
+ status = {f'{k}_{card}_{channel}': v for k, v in status.items()}
+ data = {'timestamp': time.time(),
+ 'block_name': f'channel_{card}_{channel}',
+ 'data': status}
+ self.agent.publish_to_feed('bias_crate_output', data)
+
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+
+ if time.time() - last_release > lock_int:
+ last_release = time.time()
+ if not self.lock.release_and_acquire(timeout=30*60):
+ self.log.error(f'Could not re-acquire lock; now held by {self.lock.job}.')
+ return False, 'Could not re-acquire lock.'
+ time.sleep(lock_int)
+ return True, "Finished monitoring bias channels"
+
+ def stop_monitoring(self, session, params):
+ self.monitor = False
+ return True, "Stopped bias crate monitoring."
+
+ # Bias Crate commands
+ # ===================
+ def get_bias_cards(self, session, params):
+ '''get_bias_cards()
+
+ **Task** - Get the available bias cards installed in the Prime-Cam bias crate.
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='get_bias_cards') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ resp = self.bias_crate.get_bias_cards()
+ if (success := resp['status'] == 'success'):
+ self.bias_cards = resp['cards']
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': {'cards': self.bias_cards}}
+ session.data = data
+ else:
+ self.log.error(f"Failed to get bias cards with Exception: {resp['msg']}")
+ return success, f'Active Bias Cards: {self.bias_cards}'
+
+ @ocs_agent.param('com_to', default=None)
+ def enable_output(self, session, params):
+ '''enable_output(com_to)
+
+ **Task** - Enable output of the specified bias card channel
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='enable_output') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+
+ resps = {}
+ for com_to in com_tos:
+ card, channel = map(int, com_to.split('.'))
+ resp = self.bias_crate.enable_output(card, channel)
+ if (success := resp['status'] == 'success'):
+ output_enabled = resp['outputEnabled']
+ else:
+ self.log.error(f"Failed to enable output for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ output_enabled = None
+ resps[com_to] = {'outputEnabled': output_enabled}
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Enabled output for channels: {com_tos}'
+
+ @ocs_agent.param('com_to', default=None)
+ def disable_output(self, session, params):
+ '''disable_output(com_to)
+
+ **Task** - Disable output of the specified bias card channel
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='disable_output') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+
+ resps = {}
+ for com_to in com_tos:
+ card, channel = map(int, com_to.split('.'))
+ resp = self.bias_crate.disable_output(card, channel)
+ if (success := resp['status'] == 'success'):
+ output_enabled = resp['outputEnabled']
+ else:
+ self.log.error(f"Failed to disable output for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ output_enabled = None
+ resps[com_to] = {'outputEnabled': output_enabled}
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Disabled output for channels: {com_tos}'
+
+ @ocs_agent.param('com_to', default=None)
+ def get_status(self, session, params):
+ '''get_status(com_to)
+
+ **Task** - Get status of the specified bias card channel
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='get_status') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+
+ resps = {}
+ for com_to in com_tos:
+ card, channel = map(int, com_to.split('.'))
+ resp = self.bias_crate.get_status(card, channel)
+ if (success := resp['status'] == 'success'):
+ status = resp
+ else:
+ self.log.error(f"Failed to get status for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ status = {'card': card, 'channel': channel, 'vbus': None, 'vshunt': None, 'current': None, 'outputEnabled': None, 'wiper': None}
+ resps[com_to] = status
+
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Got status for channels: {com_tos}'
+
+ @ocs_agent.param('com_to', default=None)
+ @ocs_agent.param('voltage')
+ def seek_voltage(self, session, params):
+ '''seek_voltage(com_to, voltage)
+
+ **Task** - Seek voltage for the specified bias card channel
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='seek_voltage') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+ if not isinstance((voltages := params['voltage']), list): voltages = [voltages]*len(com_tos)
+
+ if not len(voltages) == len(com_tos):
+ err = 'Voltages specified do not match the number of channels specified.'
+ self.log.error(err)
+ return False, err
+
+ resps = {}
+ for com_to, voltage in zip(com_tos, voltages):
+ card, channel = map(int, com_to.split('.'))
+ if voltage > self.max_voltage:
+ self.log.warn(f'Cannot set voltage to {voltage} V as it exceeds the maximum voltage {self.max_voltage} V specified; setting to {self.max_voltage} V instead.')
+ voltage = self.max_voltage
+
+ resp = self.bias_crate.seek_voltage(card, channel, voltage)
+ if (success := resp['status'] == 'success'):
+ chan_voltage = resp['vbus']
+ else:
+ self.log.error(f"Failed to seek voltage for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ chan_voltage = None
+ resps[com_to] = {'vbus': chan_voltage}
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Set voltage for channels: {com_tos}'
+
+ @ocs_agent.param('com_to', default=None)
+ @ocs_agent.param('current')
+ def seek_current(self, session, params):
+ '''seek_current(com_to, current)
+
+ **Task** - Seek current(s) for the specified bias card channel(s)
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='seek_current') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+ if not isinstance((currents := params['current']), list): currents = [currents]*len(com_tos)
+
+ if not len(currents) == len(com_tos):
+ err = 'Currents specified do not match the number of channels specified.'
+ self.log.error(err)
+ return False, err
+
+ resps = {}
+ for com_to, current in zip(com_tos, currents):
+ card, channel = map(int, com_to.split('.'))
+ if current > self.max_current:
+ self.log.warn(f'Cannot set current to {current} A as it exceeds the maximum current {self.max_current} A specified; setting to {self.max_current} A instead.')
+ current = self.max_current
+
+ resp = self.bias_crate.seek_current(card, channel, current)
+ if (success := resp['status'] == 'success'):
+ chan_current = resp['current']
+ else:
+ self.log.error(f"Failed to seek current for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ chan_current = None
+ resps[com_to] = {'current': chan_current}
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Set current for channels: {com_tos}'
+
+ @ocs_agent.param('com_to', default=None)
+ def enable_testload(self, session, params):
+ '''enable_testload(com_to)
+
+ **Task** - Enable test load of the specified bias card channel
+
+ '''
+ with self.lock.acquire_timeout(timeout=10, job='enable_testload') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+
+ resps = {}
+ for com_to in com_tos:
+ card, channel = map(int, com_to.split('.'))
+ resp = self.bias_crate.enable_testload(card, channel)
+ if (success := resp['status'] == 'success'):
+ chan_vshunt = resp['vshunt']
+ else:
+ self.log.error(f"Failed to enable testload for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ chan_vshunt = None
+ resps[com_to] = {'vshunt': chan_vshunt}
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Enabled testload for channels: {com_tos}'
+
+ @ocs_agent.param('com_to', default=None)
+ def disable_testload(self, session, params):
+ '''disable_testload(com_to)
+
+ **Task** - Disable test load of the specified bias card channel
+
+ '''
+ card, channel = map(int, params['com_to'].split('.'))
+
+ with self.lock.acquire_timeout(timeout=10, job='disable_testload') as acquired:
+ if not acquired:
+ err = f'Lock could not be acquired because it is held by {self.lock.job}.'
+ self.log.error(err)
+ return False, err
+
+ if isinstance((com_tos := params['com_to']), str): com_tos = [com_tos]
+
+ resps = {}
+ for com_to in com_tos:
+ card, channel = map(int, com_to.split('.'))
+ resp = self.bias_crate.disable_testload(card, channel)
+ if (success := resp['status'] == 'success'):
+ chan_vshunt = resp['vshunt']
+ else:
+ self.log.error(f"Failed to disable testload for Card {card} Channel {channel} with Exception: {resp['msg']}")
+ chan_vshunt = None
+ resps[com_to] = {'vshunt': chan_vshunt}
+ data = {'timestamp': time.time(),
+ 'block_name': 'bias_crate',
+ 'data': resps}
+ session.data = data
+ return success, f'Disabled testload for channels: {com_tos}'
+
+def make_parser(parser=None):
+ """Build the argument parser for the Agent. Allows Sphinx to automatically
+ build documentation based on this function.
+
+ """
+ if parser is None: parser = argparse.ArgumentParser()
+
+ # Add options specific to this agent.
+ pgroup = parser.add_argument_group('Agent Options')
+ pgroup.add_argument('--redis-host', type=str, help='Host name of the Redis server communicating with the Prime-Cam bias crate.')
+ pgroup.add_argument('--redis-port', type=int, help='Port that Redis server communicating with the Prime-Cam bias crate is running on.')
+ pgroup.add_argument('--redis-pub-chan', type=str, help='Redis channel to publish commands to.')
+ pgroup.add_argument('--redis-sub-chan', type=str, help='Redis channel to listen to for responses to commands.')
+ pgroup.add_argument('--redis-timeout', type=float, help='Time in seconds to wait for a response from a command sent to the Prime-Cam bias crate.')
+ pgroup.add_argument('--mode', type=str, default='acq', choices=['init', 'acq'])
+ pgroup.add_argument('--poll-interval', type=float, default=10, help='Time in seconds to poll bias card channel status.')
+ pgroup.add_argument('--lock-interval', type=float, default=0.1, help='Time in seconds to release channel monitor lock.')
+ pgroup.add_argument('--max-current', type=float, default=0.01, help='Maximum current limit in Amperes.')
+ pgroup.add_argument('--max-voltage', type=float, default=2, help='Maximum voltage limit in Volts.')
+ return parser
+
+def main(args=None):
+ txaio.start_logging(level=os.environ.get("LOGLEVEL", "info"))
+
+ parser = make_parser()
+ args = site_config.parse_args(agent_class='PrimecamBiasAgent',
+ parser=parser,
+ args=args)
+ init_params = {'auto_acquire': args.mode == 'acq',
+ 'poll_int': args.poll_interval,
+ 'lock_int': args.lock_interval}
+ agent, runner = ocs_agent.init_site_agent(args)
+
+ b = PrimecamBiasAgent(agent, host = args.redis_host,
+ port = args.redis_port,
+ pub_chan = args.redis_pub_chan,
+ sub_chan = args.redis_sub_chan,
+ timeout = args.redis_timeout,
+ max_current = args.max_current,
+ max_voltage = args.max_voltage,)
+
+ agent.register_task('init_crate', b.init_crate, startup=init_params)
+ agent.register_task('get_bias_cards', b.get_bias_cards)
+ agent.register_task('enable_output', b.enable_output)
+ agent.register_task('disable_output', b.disable_output)
+ agent.register_task('get_status', b.get_status)
+ agent.register_task('seek_voltage', b.seek_voltage)
+ agent.register_task('seek_current', b.seek_current)
+ agent.register_task('enable_testload', b.enable_testload)
+ agent.register_task('disable_testload', b.disable_testload)
+
+ agent.register_process('monitor_channel', b.monitor_channel, b.stop_monitoring)
+ runner.run(agent, auto_reconnect=True)
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
diff --git a/pcs/agents/rfsoc_controller/__init__.py b/pcs/agents/rfsoc_controller/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/pcs/agents/rfsoc_controller/agent.py b/pcs/agents/rfsoc_controller/agent.py
new file mode 100644
index 0000000..b910c4e
--- /dev/null
+++ b/pcs/agents/rfsoc_controller/agent.py
@@ -0,0 +1,727 @@
+import os
+import sys
+
+import time
+import argparse
+from pathlib import Path
+from functools import wraps
+from ocs import ocs_agent, site_config
+from ocs.ocs_twisted import TimeoutLock
+
+# Import Twisted Modules for ccatkidlib python scripts
+from autobahn.twisted.util import sleep as dsleep
+from twisted.internet import protocol, reactor
+from twisted.internet.defer import Deferred, inlineCallbacks
+from twisted.python.failure import Failure
+from typing import Optional
+
+# ccatkidlib Imports
+from ccatkidlib.rfsoc.rfsoc_daq import R
+import ccatkidlib.io as io
+
+class CCATKIDlibScriptProtocol(protocol.ProcessProtocol):
+ def __init__(self, script, log=None):
+ self.script = Path(script)
+ self.log = log
+ self.end_status: Optional[Failure] = None
+
+ def connectionMade(self):
+ """Called when process is started"""
+ self.transport.closeStdin()
+
+ def outReceived(self, data):
+ """Called whenever data is received through stdout"""
+ if self.log:
+ self.log.info(f"{self.script.name} | {data.strip().decode('utf-8')}")
+
+ def errReceived(self, data):
+ """Called whenever data is received through stderr"""
+ self.log.error(data)
+
+ def processExited(self, status: Failure):
+ """Called when process has exited."""
+
+ exit_code = status.value.exitCode
+ if self.log:
+ self.log.info(f"{self.script.name} | Process exited code {exit_code}.")
+
+ self.deferred.callback(exit_code)
+
+class RFSoController:
+ """
+ PCS Agent for controlling Radio Frequency Systems on a Chip (RFSoCs) through
+ ccatkidlib scripts and methods.
+
+ Modelled after SOCS PysmurfController with modifications.
+ """
+
+ def __init__(
+ self,
+ agent,
+ module: str = None,
+ session_timeout: float = 60 * 60,
+ monitor_interval: float = 60,
+ log_level="INFO",
+ ):
+ """
+ Constructor for RfsocController.
+ Initializes agent and starts new measurement session.
+
+ Parameters:
+ agent (ocs.ocs_agent): OCS agent instance
+ config (str): Path to rfsoc-controller config relative to OCS_CONFIG_DIR
+ module (str): Which instrument module to control with the rfsoc-controller
+ Notes:
+ Arguments for constructor passed through OCS config file (e.g. default.yaml in OCS_CONFIG_DIR)
+ """
+
+ # Create OCS agent and get log
+ self.agent = agent
+ self.ocs_session = None
+ self.log = agent.log
+ self.module = module
+
+ self.lock = TimeoutLock() # Create lock
+ self._new_session(init_boards=True) # Start new ccatkidlib session
+
+ self.last_call = time.time()
+ self.monitor_session, self.session_stale = True, False
+ self.session_timeout, self.monitor_interval = session_timeout, monitor_interval
+
+ self.prot = None
+
+ # =================#
+ # Control Methods #
+ # =================#
+
+ @staticmethod
+ def _get_control(func):
+ """
+ Decorator for use with OCS tasks/processes of ccatkidlib methods.
+ Creates the RFSoC control object with correct system state and passes it to decorated task/process.
+ Updates system state after task/process finishes execution.
+
+ Parameters:
+ func (func): OCS task/process of ccatkidlib method to decorate
+ """
+
+ @wraps(func)
+ def _wrapper(self, session, params):
+ RC = (
+ R(
+ init_boards=False,
+ init_drones=False,
+ sess_id=self.session,
+ measurement_name=self.measurement_name,
+ measurement_desc=self.measurement_desc,
+ curr_date=self.curr_date,
+ )
+ if not self.session_stale
+ else self._new_session(init_boards=False)
+ )
+ self.last_call, self.session_stale = time.time(), False
+
+ RC.NCLOs = self.NCLOs
+ RC.drive_attens = self.drive_attens
+ RC.sense_attens = self.sense_attens
+
+ RC.set_NCLO(setup=False)
+ RC.set_atten(setup=False)
+
+ params["R"] = RC
+ rtn = func(self, session, params)
+
+ self._update_control(RC)
+
+ return rtn
+
+ return _wrapper
+
+ @ocs_agent.param("init_boards", type=bool, default=False)
+ def new_session(self, session, params):
+ """new_session(init_boards=False)
+
+ **Task** - Start a new measurement session
+
+ Parameters:
+ init_boards (bool, optional): Whether to reinitialize the RFSoC boards
+ """
+ with self.lock.acquire_timeout(5, job="new_session") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+
+ RC = self._new_session(init_boards=params["init_boards"])
+ return True, f"Succesfully created new session: {self.session}"
+
+ def _new_session(self, init_boards):
+ """
+ Internal method for starting a new measurement session.
+
+ Parameters:
+ init_boards (bool): Whether to reinitialize the RFSoC boards
+ """
+ RC = R(
+ init_boards=init_boards, init_drones=True
+ ) # Instantiate RFSoC control object with full board and drone setup
+ self._update_control(RC)
+ self.log.info(f"Succesfully created new session: {self.session}")
+ return RC
+
+ @_get_control
+ def get_session(self, session, params):
+ """get_session()
+
+ **Task** - Get the ccatkidlib sess_id of the current measurement session.
+
+ """
+ with self.lock.acquire_timeout(5, job="get_session") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+
+ RC = params.pop("R")
+ data = {"curr_time": time.time()}
+ self._publish_data(data, RC, params, session)
+ return True, self.session
+
+ def _update_control(self, RC):
+ """
+ Internal method for updating the system state based on the state of the given RFSoC control object.
+
+ Parameters:
+ RC (ccatkidlib.rfsoc.rfsoc_daq.R): RFSoC control object
+ """
+
+ # Create/update attributes to save system state of control object across recreations
+ # ----------------------------------------------------------------------------------
+ # Get the session ID, name, and description of measurement
+ self.session = RC.sess_id
+ self.curr_date = RC.curr_date
+ self.measurement_name = RC.measurement_name
+ self.measurement_desc = RC.measurement_desc
+
+ # Get the current NCLOs and attenuations
+ self.NCLOs = RC.NCLOs
+ self.drive_attens = RC.drive_attens
+ self.sense_attens = RC.sense_attens
+
+ # ===============#
+ # Setup Methods #
+ # ===============#
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("drive", type=(int, list), default=None)
+ @ocs_agent.param("sense", type=(int, list), default=None)
+ def set_atten(self, session, params):
+ """ """
+ with self.lock.acquire_timeout(5, job="atten") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ drive, sense = RC.set_atten(**params)
+ data = {"drive": drive, "sense": sense}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully set drive/sense attenuations."
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("NCLO", type=(int, list), default=None)
+ def set_NCLO(self, session, params):
+ """ """
+ with self.lock.acquire_timeout(5, job="NCLO") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ NCLO = RC.set_NCLO(**params)
+ data = {"NCLO": NCLO}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully set NCLO frequencies."
+
+ # ================#
+ # Script Methods #
+ # ================#
+
+ @inlineCallbacks
+ def _run_script(self, session, script, args):
+ """
+ Internal method for running a ccatkidlib RFSoC control script using the Twisted reactor.
+ Modelled after _run_script method of SOCS PysmurfController
+
+ Parameters:
+ session (ocs.ocs_agent.OpSession): OpSession object of run task
+ script (str): Path of ccatkidlib python script to run
+ args (list[str], optional): Additional arguments to pass to script
+ """
+
+ with self.lock.acquire_timeout(5, job=script) as acquired:
+ if not acquired:
+ self.log.error(
+ f"The requested script cannot be run because the lock is held by {self.lock.job}"
+ )
+ return (
+ False,
+ f"The requested script cannot be run because lock is held by {self.lock.job}",
+ )
+ self.ocs_session = session
+ try:
+ self.prot = CCATKIDlibScriptProtocol(script, log=self.log)
+ self.prot.deferred = Deferred()
+ python_exec = sys.executable
+
+ cmd = [python_exec, "-u", script] + list(map(str, args))
+
+ self.log.info(f"Running Script: {' '.join(cmd)}")
+
+ reactor.spawnProcess(self.prot, python_exec, cmd, env=os.environ)
+
+ exit_code = yield self.prot.deferred
+
+ return exit_code == 0, f"Script has finished with exit code {exit_code}"
+
+ finally:
+ # Sleep to allow any remaining messages to be put into the
+ # session var
+ yield dsleep(1.0)
+ self.ocs_session = None
+
+ @inlineCallbacks
+ @ocs_agent.param("script", type=str)
+ @ocs_agent.param("args", type=list, default=[])
+ @ocs_agent.param("new_session", type=bool, default=True)
+ def run(self, session, params):
+ """run(script, args=None)
+
+ **Task** - Run a ccatkidlib RFSoC control script
+
+ Parameters:
+ script (str): Path of ccatkidlib python script to run
+ args (list[str], optional): Additional arguments to pass to script
+
+ Examples:
+ Example for running a test script with a client::
+ client.run(script='/app/pcs/ccatkidlib/scripts/controller/test.py', args=[])
+ Notes:
+ Script path must be that within the docker container.
+ For example, if ccatkidlib is mounted to /app/pcs/ccatkidlib within the container,
+ the path to run a script in the scripts directory would be /app/pcs/ccatkidlib/scripts/.py
+
+ """
+ status, msg = yield self._run_script(
+ session, params["script"], params.get("args", [])
+ )
+ # Set stored NCLO and attenuations to None since their state may have changed during script execution
+ self.NCLOs = None
+ self.drive_attens = None
+ self.sense_attens = None
+
+ if params["new_session"]:
+ self._new_session(init_boards=False)
+
+ return status, msg
+
+ def abort(self, session, params=None):
+ """abort()
+
+ **Task** - Aborts the actively running script.
+
+ """
+ self.prot.transport.signalProcess("KILL")
+ return True, "Aborting process"
+
+ # ==================#
+ # Main DAQ Methods #
+ # ==================#
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("time", type=float, check=lambda x: x > 0)
+ @ocs_agent.param("write_comb", type=bool, default=None)
+ @ocs_agent.param("tone_freqs", type=list, default=None)
+ @ocs_agent.param("tone_powers", type=list, default=None)
+ @ocs_agent.param("tone_phis", type=list, default=None)
+ def take_timestream(self, session, params):
+ with self.lock.acquire_timeout(5, job="timestream") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ t_sec = params.pop("time")
+ stream_files = RC.take_timestream(t_sec, **params)
+ data = {"data_files": list(map(str, stream_files))}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully finished taking timestream."
+
+ # ===============#
+ # Sweep Methods #
+ # ===============#
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("write_comb", type=bool, default=True)
+ @ocs_agent.param("sweep_steps", type=(float, list), default=None)
+ def take_vna_sweep(self, session, params):
+ """take_vna_sweep(com_to=None, write_comb=True, sweep_steps=None, parallel_boards=None, parallel_drones=None)
+
+ **Task** - Take a VNA sweep
+
+ Parameters:
+ com_to (list[str], optional): List of drones to take VNA sweep
+ write_comb (bool, optional): Whether to write a new VNA comb (default: True)
+ sweep_steps (int | list[int], optional): Number of points each tone should sweep (default: sweep_steps in drone_config)
+ parallel_boards (int, optional): Number of boards to run in parallel (default: parallel_boards in system_config)
+ parallel_drones (int, optional): Number of drones to run in parallel (default: parallel_drones in system_config)
+
+ Examples:
+ Take VNA sweep with all drones of board 1 and drone 1 of board 2 in parallel::
+ client.take_vna_sweep(com_to=['1', '2.1'], sweep_steps=500, parallel_boards=2, parallel_drones=4)
+
+ Notes:
+ Example session data:
+ >>> response.session['data']
+ PUT EXAMPLE HERE
+ """
+ with self.lock.acquire_timeout(5, job="vna_sweep") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ vna_files = RC.take_vna_sweep(**params)
+ data = {"data_files": list(map(str, vna_files))}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully finished taking VNA sweep."
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("chan_bw", type=(float, list), default=None)
+ @ocs_agent.param("sweep_steps", type=(int, list), default=None)
+ @ocs_agent.param("write_comb", type=bool, default=None)
+ @ocs_agent.param("tone_freqs", type=list, default=None)
+ @ocs_agent.param("tone_powers", type=list, default=None)
+ @ocs_agent.param("tone_phis", type=list, default=None)
+ def take_target_sweep(self, session, params):
+ """take_target_sweep(com_to=None, write_comb=True, sweep_steps=None, parallel_boards=None, parallel_drones=None)
+
+ **Task** - Take a target sweep
+
+ Parameters:
+ com_to (list[str], optional): List of drones to take VNA sweep
+ write_comb (bool, optional): Whether to write a new VNA comb (default: True)
+ sweep_steps (int | list[int], optional): Number of points each tone should sweep (default: sweep_steps in drone_config)
+ parallel_boards (int, optional): Number of boards to run in parallel (default: parallel_boards in system_config)
+ parallel_drones (int, optional): Number of drones to run in parallel (default: parallel_drones in system_config)
+
+ Examples:
+ Take target sweep with all drones of board 1 and drone 1 of board 2 in parallel::
+ client.take_target_sweep(com_to=['1', '2.1'], sweep_steps=500, parallel_boards=2, parallel_drones=4)
+
+ Notes:
+ Example session data:
+ >>> response.session['data']
+ PUT EXAMPLE HERE
+ """
+ with self.lock.acquire_timeout(5, job="target_sweep") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ targ_files = RC.take_target_sweep(**params)
+ data = {"data_files": list(map(str, targ_files))}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully finished taking target sweep."
+
+ @_get_control
+ @ocs_agent.param("com_to", type=list, default=None)
+ @ocs_agent.param("write_comb", type=bool, default=None)
+ @ocs_agent.param("new_sweep", type=bool, default=True)
+ @ocs_agent.param("sweep_steps", type=(float, list[float]), default=None)
+ def find_detectors(self, session, params):
+ with self.lock.acquire_timeout(5, job="find_detectors") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ found_nums, vna_files = RC.find_detectors(**params)
+ vna_files = list(map(str, vna_files))
+ data = {"found_nums": found_nums, "data_files": vna_files}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully finished finding detectors from VNA sweep."
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("method", type=str, default="grad")
+ @ocs_agent.param("new_sweep", type=bool, default=True)
+ @ocs_agent.param("chan_bw", type=(float, list), default=None)
+ @ocs_agent.param("sweep_steps", type=(float, list), default=None)
+ @ocs_agent.param("write_comb", type=bool, default=None)
+ @ocs_agent.param("tone_freqs", type=list, default=None)
+ @ocs_agent.param("tone_powers", type=list, default=None)
+ @ocs_agent.param("tone_phis", type=list, default=None)
+ def tune_tone_placement(self, session, params):
+ with self.lock.acquire_timeout(5, job="tune_tone_placement") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ targ_files = RC.tune_tone_placement(**params)
+ data = {"data_files": list(map(str, targ_files))}
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully finished tuning detector tone frequencies."
+
+ @_get_control
+ @ocs_agent.param("com_to", type=(str, list), default=None)
+ @ocs_agent.param("method", type=str, default="stream")
+ @ocs_agent.param(
+ "atten_bounds",
+ type=list,
+ default=None,
+ check=lambda x: all(len(xx) == 2 for xx in x),
+ )
+ @ocs_agent.param("num_atten", type=int, default=None, check=lambda x: x > 1)
+ @ocs_agent.param("chan_bw", type=(float, list), default=None)
+ @ocs_agent.param("sweep_steps", type=(float, list), default=None)
+ @ocs_agent.param("write_comb", type=bool, default=None)
+ @ocs_agent.param("tone_freqs", type=list, default=None)
+ @ocs_agent.param("tone_powers", type=list, default=None)
+ @ocs_agent.param("tone_phis", type=list, default=None)
+ @ocs_agent.param("stream_time", type=float, default=0, check=lambda x: x >= 0)
+ def tune_tone_power(self, session, params):
+ """ """
+ with self.lock.acquire_timeout(5, job="tune_tone_power") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+ params = self._filter_params(params)
+ RC = params.pop("R")
+ targ_files, stream_files = RC.tune_tone_power(**params)
+ data = {
+ "data_files": {
+ "targ": list(map(str, targ_files)),
+ "stream": list(map(str, stream_files)),
+ }
+ }
+ self._publish_data(data, RC, params, session)
+ return True, "Successfully finished tuning detector tone powers."
+
+ # ===========#
+ # Processses #
+ # ========== #
+ def monitor_session(self, session, params):
+ """ """
+
+ with self.lock.acquire_timeout(5, job="monitor_session") as acquired:
+ if not acquired:
+ self.log.error(
+ f"Could not acquire lock because it is held by {self.lock.job}."
+ )
+ return (
+ False,
+ f"Could not acquire lock because it is held by {self.lock.job}.",
+ )
+
+ while self.monitor_session:
+ if time.time() - self.last_call >= self.session_timeout:
+ self.session_stale = True # Mark that the current session has become stale and a new session should be created upon next command call
+
+ if not self.lock.release_and_acquire(timeout=0):
+ self.log.error(
+ f"Could not re-acquire lock now held by {self.lock.job}."
+ )
+ return False, "Could not re-acquire lock."
+ time.sleep(self.monitor_interval)
+ return True, "Session monitoring exceuted successfully."
+
+ def stop_monitoring(self, session, params):
+ """ """
+ if self.monitor_session:
+ self.monitor_session = False
+ return True, "Stopping session monitoring..."
+ else:
+ return False, "Not currently monitoring session."
+
+ # ================#
+ # Helper Methods #
+ # ================#
+ def _filter_params(self, params):
+ """
+ Internal function for filtering out keys with None value from params dictionary
+ so that ccatkidlib defaults are used
+
+ Parameters:
+ params (dict[any]): params dictionary to filter
+ """
+ return {k: v for k, v in params.items() if v is not None}
+
+ def _publish_data(self, data, RC, params, session):
+ """
+ Internal method for publishing data returned by ccatkidlib OCS task/process
+ to the OCS OpSession.data dictionary.
+
+ Parameters:
+ data (any): Data returned by ccatkidlib method that was run
+ RC (ccatkidlib.rfsoc.rfsoc_daq.R): RFSoC control object used to run ccatkidlib method
+ params (dict[any]): Parameters used to run ccatkidlib method
+ session (ocs.ocs_agent.OpSession): OpSession of ccatkidlib OCS task/process
+ """
+
+ # Check if ccatkidlib method was run with a different set of drones than in system_config file
+ com_to = params["com_to"] if "com_to" in params else RC.drone_list
+
+ # Create data dictionary with returned data, drones used, and measurement info
+ data_dict = {
+ "module": self.module,
+ "name": RC.measurement_name,
+ "date": RC.curr_date,
+ "session": RC.sess_id,
+ "timestamp": RC.timestamp,
+ "com_to": com_to,
+ "data": data,
+ }
+
+ # Pass data dictionary to OpSession.data
+ session.data = data_dict
+
+
+def make_parser(parser=None):
+ """
+ Build ArgumentParser for passing arguments through OCS config file (e.g. default.yaml in OCS_CONFIG_DIR)
+ """
+ if parser is None:
+ parser = argparse.ArgumentParser()
+
+ pgroup = parser.add_argument_group("Agent Options")
+ pgroup.add_argument(
+ "--module",
+ type=str,
+ choices=["280-GHz", "350-GHz", "850-GHz", "eor-spec", "mod-cam"],
+ help="Which instrument module to control with rfsoc-controller.",
+ )
+ pgroup.add_argument(
+ "--session-timeout",
+ type=float,
+ default=3600,
+ help="Time after which a new session should be started if no commands have been run.",
+ )
+ pgroup.add_argument(
+ "--monitor-interval",
+ type=float,
+ default=60,
+ help="Time interval at which to monitor session status.",
+ )
+
+ pgroup.add_argument(
+ "--log-level",
+ type=str,
+ default='INFO',
+ help="Level at which to log ccatkidlib messages.",
+ )
+
+ return parser
+
+
+def main(args=None):
+ # Parse arguments passed in OCS config file
+ # -----------------------------------------
+ parser = make_parser()
+ args = site_config.parse_args(
+ agent_class="RfsocController", parser=parser, args=args
+ )
+
+ # Create RFSoController agent
+ # ---------------------------
+ agent, runner = ocs_agent.init_site_agent(args)
+ rfsoc_controller = RFSoController(
+ agent,
+ module=args.module,
+ session_timeout=args.session_timeout,
+ monitor_interval=args.monitor_interval,
+ log_level=args.log_level,
+ )
+
+ # Register agent tasks and processes
+ # ----------------------------------
+ agent.register_task("run", rfsoc_controller.run, blocking=False)
+ agent.register_task("abort", rfsoc_controller.abort, blocking=False)
+ agent.register_task("get_session", rfsoc_controller.get_session)
+ agent.register_task("new_session", rfsoc_controller.new_session)
+ agent.register_task("set_NCLO", rfsoc_controller.set_NCLO)
+ agent.register_task("set_atten", rfsoc_controller.set_atten)
+ agent.register_task("take_vna_sweep", rfsoc_controller.take_vna_sweep)
+ agent.register_task("take_target_sweep", rfsoc_controller.take_target_sweep)
+ agent.register_task("take_timestream", rfsoc_controller.take_timestream)
+ agent.register_task("find_detectors", rfsoc_controller.find_detectors)
+ agent.register_task("tune_tone_placement", rfsoc_controller.tune_tone_placement)
+ agent.register_task("tune_tone_power", rfsoc_controller.tune_tone_power)
+
+ agent.register_process(
+ "monitor_session",
+ rfsoc_controller.monitor_session,
+ rfsoc_controller.stop_monitoring,
+ )
+ # Run agent
+ # ---------
+ runner.run(agent, auto_reconnect=True)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/pcs/drivers/bias_crate.py b/pcs/drivers/bias_crate.py
new file mode 100644
index 0000000..6f61dfd
--- /dev/null
+++ b/pcs/drivers/bias_crate.py
@@ -0,0 +1,90 @@
+import redis
+import txaio
+import time
+import json
+txaio.use_twisted()
+
+class BiasCrate:
+ def __init__(self, host: str, port: int, pub_chan: str, sub_chan: str, timeout=30):
+ self.log = txaio.make_logger()
+ self.r = redis.Redis(host=host, port=port, decode_responses=True)
+ try:
+ self.r.ping()
+ except redis.TimeoutError as e:
+ err = f"Failed to connect to Redis server at {host}:{port} with Exception:\n {e}"
+ self.log.error(err)
+ raise RuntimeError(err)
+
+ self.timeout = timeout
+ self.pub_chan, self.sub_chan = pub_chan, sub_chan
+ self.p = self.r.pubsub(ignore_subscribe_messages=True)
+ self.p.subscribe(sub_chan)
+
+ # Bias Crate Commands
+ # ===================
+ def get_bias_cards(self):
+ id, num_clients = self.pub_command('getAvailableCards')
+ return self.sub_response(id)
+
+ def enable_output(self, card: int, channel: int):
+ id, num_clients = self.pub_command('enableOutput', card=card, channel=channel)
+ return self.sub_response(id)
+
+ def disable_output(self, card: int, channel: int):
+ id, num_clients = self.pub_command('disableOutput', card=card, channel=channel)
+ return self.sub_response(id)
+
+ def get_status(self, card: int, channel: int):
+ id, num_clients = self.pub_command('getStatus', card=card, channel=channel)
+ return self.sub_response(id)
+
+ def seek_voltage(self, card: int, channel: int, voltage: float):
+ id, num_clients = self.pub_command('seekVoltage', card=card, channel=channel, voltage=voltage)
+ return self.sub_response(id)
+
+ def seek_current(self, card: int, channel: int, current: float):
+ id, num_clients = self.pub_command('seekCurrent', card=card, channel=channel, current=current)
+ return self.sub_response(id)
+
+ def enable_testload(self, card: int, channel: int):
+ id, num_clients = self.pub_command('enableTestload', card=card, channel=channel)
+ return self.sub_response(id)
+
+ def disable_testload(self, card: int, channel: int):
+ id, num_clients = self.pub_command('disableTestload', card=card, channel=channel)
+ return self.sub_response(id)
+
+ # Redis helper methods
+ # ====================
+ def pub_command(self, command: str, **kwargs):
+ id = int(time.time())
+ com_dict = {'id': id,
+ 'command': command,
+ 'args': kwargs}
+ num_clients = self.r.publish(self.pub_chan, json.dumps(com_dict))
+ if num_clients == 0: self.log.error(f'No clients received command {command}!')
+ return id, num_clients
+
+ def sub_response(self, id: int):
+ timed_out, last_resp = False, None
+ while not timed_out:
+ start_time = time.time()
+ resp = self.p.get_message(timeout=self.timeout)
+ timed_out = time.time() - start_time > self.timeout
+
+ if resp is not None:
+ last_resp = resp
+ resp = json.loads(resp['data'])
+ if resp['id'] == id: # Response from valid command should always match published ID
+ if resp['status'] == 'error':
+ self.log.error(f"Command exited with code {resp['code']}: {resp['msg']}")
+ return resp
+ else:
+ if last_resp is not None and last_resp['id'] == 0: # Invalid commands should return with ID == 0
+ self.log.error(f"Invalid command exited with code {last_resp['code']}: {last_resp['msg']}")
+ else:
+ err = 'No response received from command. Check connection to Redis server.'
+ self.log.error(err)
+ last_resp = {'status': 'error', 'msg': err}
+ return last_resp
+
diff --git a/pcs/drivers/coldload.py b/pcs/drivers/coldload.py
new file mode 100644
index 0000000..1a217a9
--- /dev/null
+++ b/pcs/drivers/coldload.py
@@ -0,0 +1,152 @@
+from ocs.ocs_client import OCSClient
+import time
+import numpy as np
+import txaio
+txaio.use_twisted()
+
+class Coldload:
+
+ def __init__(self, lakeshore, ls_channel):
+ self.ls_channel = ls_channel
+ self.log = txaio.make_logger()
+
+ # Create Lakeshore client for grabbing coldload temperature data
+ try:
+ self.lakeshore = OCSClient(lakeshore, args=[])
+ except Exception as e:
+ self.log.error(f'Could not connect to Lakeshore agent \033[3m{lakeshore}\033[0m for temperature monitoring: {e}')
+
+ def get_temp(self):
+ """
+ Get the current temperature of the coldload.
+ """
+ # Fetch most data from Lakeshore OCS feed
+ acq_status = self.lakeshore.acq.status().session
+
+ # Check to see if lakeshore is actively acquiring data
+ if acq_status['op_code'] == 3:
+ # Get coldload temperature data using specified channel
+ try:
+ temp = acq_status['data']['fields'][self.ls_channel]['T']
+ except KeyError as e:
+ self.log.error(f'Specified Lakeshore channel {self.ls_channel} is not valid: {e}')
+ temp = None
+ else:
+ self.log.error('Lakeshore data acquisition is not running.')
+ temp = None
+ return temp
+
+ def set_temp(self, temp: float, get_current, set_current, *args, **kwargs):
+ """
+ Set the temperature of the coldload using a proportional integral derivative (PID) controller.
+ The PID controller uses the coldload temperature as the process variable and the current squared as the control variable.
+
+ Parameters:
+ temp (float): Temperature to set coldload to
+ get_current: Function for getting current. Abstracted so that set_temp is compatible with different power supplies
+ set_current: Function for setting current. Should have "curr" argument as a keyword argument or as the last positional argument. Abstracted so that set_temp is compatible with different power supplies
+ args: Arguments for get_current and set_current functions
+
+ kwargs:
+ sample_int (float): Interval at which to sample coldload temperature
+ avg_int (float): Interval over which to average coldload temperatures (averaged temperature used as PID process variable). Also sets timescale for PID control
+ thresholds (List(float)): Error thresholds at which to modify avg_int. avg_int will be used for errors greater than the largest threshold and then multiplied by 2 for each threshold passed.
+ timeout (float): Time in minutes after which to exit PID loop (0 for indefinite)
+ yield_dict (bool): Whether to yield error values and coldload current after each PID control loop
+ max_current (float): Maximum current limit
+ pid (List[float]): Proportional, integral, and derivative control coefficients
+ int_threshold (float): Error threshold hold after which the integral term will start contributing to the PID control.
+ """
+
+ sample_int = 0.5
+ default_avg_int = 7.5
+ timeout = 180
+ yield_dict = False
+
+ reset_current = False
+ max_current = 0.6
+ init_pid = [5e-4, 1e-7, 9e-2]
+ stable_pid = [5e-4, 1e-7, 9e-2]
+ ID_threshold = 0.125
+ thresholds = [1.13e-3, 2.5e-7, 0.35]
+
+ err_p = temp - self.get_temp()
+ err_i = 0.0
+ err_d = 0.0
+ errs = []
+
+ for k, v in kwargs.items():
+ if k == 'sample_int':
+ sample_int = v
+ elif k == 'avg_int':
+ default_avg_int = v
+ elif k == 'thresholds':
+ thresholds = v
+ elif k == 'timeout':
+ timeout = v
+ elif k == 'yield_dict':
+ yield_dict = v
+ elif k == 'max_current':
+ max_current = v
+ elif k == 'init_pid':
+ init_pid = v
+ elif k == 'stable_pid':
+ stable_pid = v
+ elif k == 'err_i':
+ err_i = v
+ elif k == 'ID_threshold':
+ ID_threshold = v
+ elif k == 'reset_current':
+ reset_current = v
+ avg_int = default_avg_int
+ timeout *= 60 # Convert timeout to seconds
+
+ # Get the current coldload current and use current squared as the control variable so that it is proportional to power (which is roughly linear with temperature)
+ curr_sq = get_current(*args)**2
+ pid = init_pid
+
+ start_time = time.time()
+ last_sample = start_time
+ last_pid = start_time
+ while timeout == 0 or time.time() - start_time < timeout:
+ curr_time = time.time()
+ if curr_time - last_sample > sample_int:
+ last_sample = curr_time
+ errs.append(temp - self.get_temp())
+
+ delta_t = curr_time - last_pid
+ if delta_t > avg_int:
+ last_pid = curr_time
+
+ avg_err = np.mean(errs) # Average the error to reduce noise
+ errs = [] # Reset list of errors
+
+ # Calculate the PID error values
+ if np.abs(avg_err) <= ID_threshold:
+ if curr_sq == 0:
+ pid = stable_pid
+ ID_threshold *= 10
+ err_i += avg_err * delta_t
+ err_d = (avg_err - err_p)/delta_t
+ err_p = avg_err
+
+ # Vary avg_int depending on how small error is to reduce noise in derivative at small errors
+ avg_int = default_avg_int * (2 ** sum(np.abs(err_p) < threshold for threshold in thresholds))
+
+
+ # Vary the current squared as specified by the PID controller
+ curr_sq += pid[0]*err_p + pid[1] * err_i + pid[2]*err_d
+
+ # Convert to current and limit it to be between 0 and max_current
+ curr_sq = max(curr_sq, 0.0)
+ curr = round(min(np.sqrt(curr_sq), max_current), 3) # Round down to mA precision
+
+ # Set the new current
+ set_current(*args, curr = curr)
+
+ if yield_dict:
+ pids = {'target_temperature': float(temp),'current': float(curr), 'err_p': float(err_p), 'err_i': float(err_i), 'err_d': float(err_d)}
+ yield pids
+ time.sleep(0.1) # Wait to prevent wasting CPU resources
+ if yield_dict: yield None # Yield None on non-PID loops to prevent the method from blocking for avg_int seconds
+ if reset_current: set_current(*args, curr=0.0)
\ No newline at end of file
diff --git a/pcs/plugin.py b/pcs/plugin.py
index d6049ca..a71880c 100644
--- a/pcs/plugin.py
+++ b/pcs/plugin.py
@@ -3,5 +3,10 @@
'LS325Agent': {'module': 'pcs.agents.lakeshore325.agent', 'entry_point': 'main'},
'RaritanAgent': {'module': 'pcs.agents.raritan_pdu.agent', 'entry_point': 'main'},
'ACUAgent': {'module': 'pcs.agents.acu_interface.agent', 'entry_point': 'main'},
- 'Bluefors_TC_Agent': {'module': 'pcs.agents.bluefors_tc.agent', 'entry_point': 'main'}
+ 'Bluefors_TC_Agent': {'module': 'pcs.agents.bluefors_tc.agent', 'entry_point': 'main'},
+ 'PCS_FTSAerotechAgent': {'module': 'pcs.agents.fts_aerotech.agent', 'entry_point': 'main'},
+ 'ColdloadAgent_ScpiPsu': {'module': 'pcs.agents.coldload_scpipsu.agent', 'entry_point': 'main'},
+ 'PrimecamBiasAgent': {'module': 'pcs.agents.primecam_bias.agent', 'entry_point': 'main'},
+ 'BeamMapperAgent': {'module': 'pcs.agents.beam_mapper.agent', 'entry_point': 'main'},
+ 'RFSoController': {'module': 'pcs.agents.rfsoc_controller.agent', 'entry_point': 'main'}
}
diff --git a/requirements.txt b/requirements.txt
index ac9194e..1f5d995 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,12 @@
# core dependencies
autobahn[serialization]
ocs
+socs==0.5.2
sqlalchemy>=1.4
twisted
# hardware communication
+redis
pyserial
requests
# Versions pinned to match socs - do we need to keep these? They are quite outdated.