diff --git a/labgrid/remote/coordinator.py b/labgrid/remote/coordinator.py index ea60f4933..edad1bec0 100644 --- a/labgrid/remote/coordinator.py +++ b/labgrid/remote/coordinator.py @@ -205,7 +205,11 @@ async def wait(self): self.expired = True -class ExporterError(Exception): +class CoordinatorError(Exception): + pass + + +class ExporterError(CoordinatorError): pass @@ -651,9 +655,12 @@ async def _acquire_resource(self, place, resource): request.place_name = place.name cmd = ExporterCommand(request) self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd) - await cmd.wait() + try: + await cmd.wait() + except asyncio.TimeoutError as e: + raise ExporterError("timed out waiting for exporter while acquiring resource") from e if not cmd.response.success: - raise ExporterError(f"failed to acquire {resource} ({cmd.response.reason})") + raise ExporterError(cmd.response.reason or "exporter returned failure without a reason") if resource.acquired != place.name: logging.warning("resource %s not acquired by this place after acquire request", resource) @@ -664,7 +671,7 @@ async def _acquire_resources(self, place, resources): # all resources need to be free for resource in resources: if resource.acquired: - return False + raise CoordinatorError(f"{resource} is already acquired by {resource.acquired}") for otherplace in self.places.values(): for oldres in otherplace.acquired_resources: @@ -672,7 +679,9 @@ async def _acquire_resources(self, place, resources): logging.info( "Conflicting orphaned resource %s for acquire request for place %s", oldres, place.name ) - return False + raise CoordinatorError( + f"conflicting orphaned resource {oldres} for acquire request for place {place.name}" + ) # acquire resources acquired = [] @@ -680,11 +689,16 @@ async def _acquire_resources(self, place, resources): for resource in resources: await self._acquire_resource(place, resource) acquired.append(resource) - except Exception: + except Exception as e: logging.exception("failed to acquire %s", resource) # cleanup - await self._release_resources(place, acquired) - return False + try: + await self._release_resources(place, acquired) + except CoordinatorError: + logging.exception("failed to release acquired resources during acquire cleanup") + if isinstance(e, CoordinatorError): + raise + raise CoordinatorError(f"failed to acquire {resource}") from e for resource in resources: place.acquired_resources.append(resource) @@ -702,6 +716,7 @@ async def _release_resources(self, place, resources, callback=True): except ValueError: pass + failure = None for resource in resources: if resource.orphaned: continue @@ -715,18 +730,27 @@ async def _release_resources(self, place, resources, callback=True): # request.place_name is left unset to indicate release cmd = ExporterCommand(request) self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd) - await cmd.wait() + try: + await cmd.wait() + except asyncio.TimeoutError as e: + raise ExporterError("timed out waiting for exporter while releasing resource") from e if not cmd.response.success: - raise ExporterError(f"failed to release {resource} ({cmd.response.reason})") + raise ExporterError(cmd.response.reason or "exporter returned failure without a reason") if resource.acquired: logging.warning("resource %s still acquired after release request", resource) - except (ExporterError, TimeoutError): + except ExporterError as e: + if failure is None: + failure = e logging.exception("failed to release %s", resource) # at leaset try to notify the clients try: self._publish_resource(resource) except: logging.exception("failed to publish released resource %s", resource) + if failure: + if isinstance(failure, ExporterError): + raise failure + raise CoordinatorError(f"failed to release resources for place {place.name}") from failure async def _synchronize_resources(self): assert self.lock.locked() @@ -878,10 +902,18 @@ async def AcquirePlace(self, request, context): if not place.hasmatch(resource.path): continue resources.append(resource) - if not await self._acquire_resources(place, resources): + try: + await self._acquire_resources(place, resources) + except CoordinatorError as e: # revert earlier change place.acquired = None - await context.abort(grpc.StatusCode.FAILED_PRECONDITION, f"Failed to acquire resources for place {name}") + message = f"Failed to acquire resources for place {name}" + if str(e): + message += f": {e}" + await context.abort( + grpc.StatusCode.FAILED_PRECONDITION, + message, + ) place.touch() self._publish_place(place) self.save_later() @@ -905,7 +937,13 @@ async def ReleasePlace(self, request, context): if fromuser and place.acquired != fromuser: return labgrid_coordinator_pb2.ReleasePlaceResponse() - await self._release_resources(place, place.acquired_resources) + try: + await self._release_resources(place, place.acquired_resources) + except CoordinatorError as e: + message = f"Failed to release resources for place {name}" + if str(e): + message += f": {e}" + await context.abort(grpc.StatusCode.FAILED_PRECONDITION, message) place.acquired = None place.allowed = set() diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index 68c4fa708..c94dc8991 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -926,8 +926,11 @@ async def message_pump(self): ) success = True except (BrokenResourceError, InvalidResourceRequestError, UnknownResourceError) as e: - reason = e.args[0] + reason = str(e) logging.warning("set_acquired_request failed: %s", reason) + except Exception as e: + reason = str(e) or repr(e) + logging.exception("failed to handle set_acquired_request: %s", reason) finally: in_message = labgrid_coordinator_pb2.ExporterInMessage() in_message.response.success = success