Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions labgrid/remote/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ async def wait(self):
self.expired = True


class ExporterError(Exception):
class CoordinatorError(Exception):
pass


class ExporterError(CoordinatorError):
pass


Expand Down Expand Up @@ -651,9 +655,12 @@ async def _acquire_resource(self, place, resource):
request.place_name = place.name
cmd = ExporterCommand(request)
self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd)
await cmd.wait()
try:
await cmd.wait()
except asyncio.TimeoutError as e:
raise ExporterError("timed out waiting for exporter while acquiring resource") from e
if not cmd.response.success:
raise ExporterError(f"failed to acquire {resource} ({cmd.response.reason})")
raise ExporterError(cmd.response.reason or "exporter returned failure without a reason")
if resource.acquired != place.name:
logging.warning("resource %s not acquired by this place after acquire request", resource)

Expand All @@ -664,27 +671,34 @@ async def _acquire_resources(self, place, resources):
# all resources need to be free
for resource in resources:
if resource.acquired:
return False
raise CoordinatorError(f"{resource} is already acquired by {resource.acquired}")

for otherplace in self.places.values():
for oldres in otherplace.acquired_resources:
if resource.path == oldres.path:
logging.info(
"Conflicting orphaned resource %s for acquire request for place %s", oldres, place.name
)
return False
raise CoordinatorError(
f"conflicting orphaned resource {oldres} for acquire request for place {place.name}"
)

# acquire resources
acquired = []
try:
for resource in resources:
await self._acquire_resource(place, resource)
acquired.append(resource)
except Exception:
except Exception as e:
logging.exception("failed to acquire %s", resource)
# cleanup
await self._release_resources(place, acquired)
return False
try:
await self._release_resources(place, acquired)
except CoordinatorError:
logging.exception("failed to release acquired resources during acquire cleanup")
if isinstance(e, CoordinatorError):
raise
raise CoordinatorError(f"failed to acquire {resource}") from e

for resource in resources:
place.acquired_resources.append(resource)
Expand All @@ -702,6 +716,7 @@ async def _release_resources(self, place, resources, callback=True):
except ValueError:
pass

failure = None
for resource in resources:
if resource.orphaned:
continue
Expand All @@ -715,18 +730,27 @@ async def _release_resources(self, place, resources, callback=True):
# request.place_name is left unset to indicate release
cmd = ExporterCommand(request)
self.get_exporter_by_name(resource.path[0]).queue.put_nowait(cmd)
await cmd.wait()
try:
await cmd.wait()
except asyncio.TimeoutError as e:
raise ExporterError("timed out waiting for exporter while releasing resource") from e
if not cmd.response.success:
raise ExporterError(f"failed to release {resource} ({cmd.response.reason})")
raise ExporterError(cmd.response.reason or "exporter returned failure without a reason")
if resource.acquired:
logging.warning("resource %s still acquired after release request", resource)
except (ExporterError, TimeoutError):
except ExporterError as e:
if failure is None:
failure = e
logging.exception("failed to release %s", resource)
# at leaset try to notify the clients
try:
self._publish_resource(resource)
except:
logging.exception("failed to publish released resource %s", resource)
if failure:
if isinstance(failure, ExporterError):
raise failure
raise CoordinatorError(f"failed to release resources for place {place.name}") from failure

async def _synchronize_resources(self):
assert self.lock.locked()
Expand Down Expand Up @@ -878,10 +902,18 @@ async def AcquirePlace(self, request, context):
if not place.hasmatch(resource.path):
continue
resources.append(resource)
if not await self._acquire_resources(place, resources):
try:
await self._acquire_resources(place, resources)
except CoordinatorError as e:
# revert earlier change
place.acquired = None
await context.abort(grpc.StatusCode.FAILED_PRECONDITION, f"Failed to acquire resources for place {name}")
message = f"Failed to acquire resources for place {name}"
if str(e):
message += f": {e}"
await context.abort(
grpc.StatusCode.FAILED_PRECONDITION,
message,
)
place.touch()
self._publish_place(place)
self.save_later()
Expand All @@ -905,7 +937,13 @@ async def ReleasePlace(self, request, context):
if fromuser and place.acquired != fromuser:
return labgrid_coordinator_pb2.ReleasePlaceResponse()

await self._release_resources(place, place.acquired_resources)
try:
await self._release_resources(place, place.acquired_resources)
except CoordinatorError as e:
message = f"Failed to release resources for place {name}"
if str(e):
message += f": {e}"
await context.abort(grpc.StatusCode.FAILED_PRECONDITION, message)

place.acquired = None
place.allowed = set()
Expand Down
5 changes: 4 additions & 1 deletion labgrid/remote/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,11 @@ async def message_pump(self):
)
success = True
except (BrokenResourceError, InvalidResourceRequestError, UnknownResourceError) as e:
reason = e.args[0]
reason = str(e)
logging.warning("set_acquired_request failed: %s", reason)
except Exception as e:
reason = str(e) or repr(e)
logging.exception("failed to handle set_acquired_request: %s", reason)
finally:
in_message = labgrid_coordinator_pb2.ExporterInMessage()
in_message.response.success = success
Expand Down