From 1bcacb27b56300c7e4a1df629229b55f8a60032a Mon Sep 17 00:00:00 2001 From: Fabian Fulga Date: Thu, 9 Apr 2026 04:03:23 +0300 Subject: [PATCH] Add clustered migration sync for shared disks (SYNCING barrier) --- .../openstack-transfer-create-resp.json | 3 +- .../transfer/openstack-transfer-get-resp.json | 1 + .../transfer/transfer-list-resp.json | 3 +- .../transfer/transfer-update-resp.json | 3 +- coriolis/api-refs/source/parameters.yaml | 9 + coriolis/api-refs/source/transfer.inc | 3 + coriolis/conductor/rpc/server.py | 366 ++++++++++++------ coriolis/constants.py | 7 + ...4_add_clustered_to_base_transfer_action.py | 20 + coriolis/db/sqlalchemy/models.py | 5 + .../disk_sync_resources_info_schema.json | 2 +- coriolis/schemas/vm_export_info_schema.json | 9 + coriolis/tasks/replica_tasks.py | 8 +- coriolis/tests/conductor/rpc/test_server.py | 1 + coriolis/tests/db/sqlalchemy/test_models.py | 2 + coriolis/tests/tasks/test_replica_tasks.py | 2 + 16 files changed, 328 insertions(+), 116 deletions(-) create mode 100644 coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json index 5baf549a5..96ccfd033 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-create-resp.json @@ -49,6 +49,7 @@ } }, "executions": [], - "scenario": "replica" + "scenario": "replica", + "clustered": false } } diff --git a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json index 55749ec6d..5e48ef5bf 100644 --- a/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json +++ b/coriolis/api-refs/api_samples/transfer/openstack-transfer-get-resp.json @@ -42,6 +42,7 @@ "origin_minion_pool_id": null, "destination_minion_pool_id": null, "instance_osmorphing_minion_pool_mappings": {}, + "clustered": false, "executions": [ { "created_at": "2019-07-11T10:06:47.000000", diff --git a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json index 14c909b39..c94b6164d 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-list-resp.json @@ -74,7 +74,8 @@ "instances": {} }, "id": "0460aa4d-6b16-4c98-bd56-27ee186e4a22", - "scenario": "replica" + "scenario": "replica", + "clustered": false } ] } diff --git a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json index 8bcdbbf0b..8332a19f9 100644 --- a/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json +++ b/coriolis/api-refs/api_samples/transfer/transfer-update-resp.json @@ -133,7 +133,8 @@ "ubuntu-xenial": "echo 'anything you need'" } }, - "scenario": "replica" + "scenario": "replica", + "clustered": false } } } diff --git a/coriolis/api-refs/source/parameters.yaml b/coriolis/api-refs/source/parameters.yaml index 3b332f374..68b5475e2 100644 --- a/coriolis/api-refs/source/parameters.yaml +++ b/coriolis/api-refs/source/parameters.yaml @@ -130,6 +130,15 @@ connection_info_schema: in: body type: object required: false +clustered: + description: | + Present on transfer responses. ``true`` when more than one instance is + listed (multi-instance scheduling: sync barriers and shared-disk + coordination). Set by the server at creation from ``instances``; not + accepted on create. + in: body + type: boolean + required: false deployment_cancel: description: | Object containing information about the type of deployment cancellation. diff --git a/coriolis/api-refs/source/transfer.inc b/coriolis/api-refs/source/transfer.inc index fa17913b6..e9c86d213 100644 --- a/coriolis/api-refs/source/transfer.inc +++ b/coriolis/api-refs/source/transfer.inc @@ -51,6 +51,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer List Response** @@ -111,6 +112,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Show Response** @@ -183,6 +185,7 @@ Response - instance_osmorphing_minion_pool_mappings : instance_osmorphing_minion_pool_mappings - user_scripts : user_scripts - scenario: scenario_type + - clustered : clustered **Example of Transfer Create Response** diff --git a/coriolis/conductor/rpc/server.py b/coriolis/conductor/rpc/server.py index 60a87b45a..efbbe1b62 100644 --- a/coriolis/conductor/rpc/server.py +++ b/coriolis/conductor/rpc/server.py @@ -794,10 +794,10 @@ def _check_execution_tasks_sanity( """ all_instances_in_tasks = { t.instance for t in execution.tasks} - instances_tasks_mapping = { - instance: [ - t for t in execution.tasks if t.instance == instance] - for instance in all_instances_in_tasks} + if not all_instances_in_tasks: + return + if initial_task_info is None: + initial_task_info = {} def _check_task_cls_param_requirements(task, instance_task_info_keys): task_cls = tasks_factory.get_task_runner_class(task.task_type) @@ -813,108 +813,85 @@ def _check_task_cls_param_requirements(task, instance_task_info_keys): missing_params)) return task_cls.get_returned_task_info_properties() - for instance, instance_tasks in instances_tasks_mapping.items(): - task_info_keys = set(initial_task_info.get( - instance, {}).keys()) - # mapping between the ID and associated object of processed tasks: - processed_tasks = {} - tasks_to_process = { - t.id: t for t in instance_tasks} - while tasks_to_process: - queued_tasks = [] - # gather all tasks which will be queued to run in parallel: - for task in tasks_to_process.values(): - if task.status in ( - constants.TASK_STATUS_SCHEDULED, - constants.TASK_STATUS_ON_ERROR_ONLY): - if not task.depends_on: - queued_tasks.append(task) - else: - missing_deps = [ - dep_id - for dep_id in task.depends_on - if dep_id not in tasks_to_process and ( - dep_id not in processed_tasks)] - if missing_deps: - raise exception.TaskDependencyException( - "Task '%s' (type '%s') for instance '%s' " - "has non-existent tasks referenced as " - "dependencies: %s" % ( - task.id, task.task_type, - instance, missing_deps)) - if all( - [dep_id in processed_tasks - for dep_id in task.depends_on]): - queued_tasks.append(task) - else: - raise exception.InvalidTaskState( - "Invalid initial state '%s' for task '%s' " - "of type '%s'." % ( - task.status, task.id, task.task_type)) - - # check if nothing was left queued: - if not queued_tasks: - remaining_tasks_deps_map = { - (tid, t.task_type): t.depends_on - for tid, t in tasks_to_process.items()} - processed_tasks_type_map = { - tid: t.task_type - for tid, t in processed_tasks.items()} - raise exception.ExecutionDeadlockException( - "Execution '%s' (type '%s') is bound to be deadlocked:" - " there are leftover tasks for instance '%s' which " - "will never get queued. Already processed tasks are: " - "%s. Tasks left: %s" % ( - execution.id, execution.type, instance, - processed_tasks_type_map, remaining_tasks_deps_map - )) - - # mapping for task_info fields modified by each task: - modified_fields_by_queued_tasks = {} - # check that each task has what it needs and - # register what they return/modify: - for task in queued_tasks: - for new_field in _check_task_cls_param_requirements( - task, task_info_keys): - if new_field not in modified_fields_by_queued_tasks: - modified_fields_by_queued_tasks[new_field] = [ - task] - else: - modified_fields_by_queued_tasks[new_field].append( - task) - - # check if any queued tasks would manipulate the same fields: - conflicting_fields = { - new_field: [t.task_type for t in tasks] - for new_field, tasks in ( - modified_fields_by_queued_tasks.items()) - if len(tasks) > 1} - if conflicting_fields: - raise exception.TaskFieldsConflict( - "There are fields which will encounter a state " - "conflict following the parallelized execution of " - "tasks for execution '%s' (type '%s') for instance " - "'%s'. Conflicting fields and tasks will be: : %s" % ( - execution.id, execution.type, instance, - conflicting_fields)) - - # register queued tasks as processed before continuing: - for task in queued_tasks: - processed_tasks[task.id] = task - tasks_to_process.pop(task.id) - # update current state fields at this point: - task_info_keys = task_info_keys.union(set( - modified_fields_by_queued_tasks.keys())) - LOG.debug( - "Successfully processed following tasks for instance '%s' " - "for execution %s (type '%s') for any state conflict " - "checks: %s", instance, execution.id, execution.type, - [(t.id, t.task_type) for t in queued_tasks]) + all_tasks_by_id = {t.id: t for t in execution.tasks} + task_info_by_instance = { + inst: set(initial_task_info.get(inst, {}).keys()) + for inst in all_instances_in_tasks} + + tasks_to_process = {t.id: t for t in execution.tasks} + if not tasks_to_process: + return + processed_tasks = set() + while tasks_to_process: + queued_tasks = [] + for task in tasks_to_process.values(): + if task.status not in ( + constants.TASK_STATUS_SCHEDULED, + constants.TASK_STATUS_ON_ERROR_ONLY): + raise exception.InvalidTaskState( + "Invalid initial state '%s' for task '%s' " + "of type '%s'." % ( + task.status, task.id, task.task_type)) + if not task.depends_on: + queued_tasks.append(task) + else: + dep_ids = list(task.depends_on) + missing = [ + dep_id for dep_id in dep_ids + if dep_id not in all_tasks_by_id] + if missing: + raise exception.TaskDependencyException( + "Task '%s' (type '%s') for instance '%s' " + "has non-existent tasks referenced as " + "dependencies: %s" % ( + task.id, task.task_type, task.instance, + missing)) + if all( + dep_id in processed_tasks + for dep_id in dep_ids): + queued_tasks.append(task) + + if not queued_tasks: + raise exception.ExecutionDeadlockException( + "Execution '%s' (type '%s') is bound to be deadlocked: " + "cannot schedule a next wave. Remaining: %s. " + "Processed: %s" % ( + execution.id, execution.type, + {t.id: t.depends_on + for t in tasks_to_process.values()}, + list(processed_tasks))) + + new_fields_by_task = {} + for task in queued_tasks: + new_fields_by_task[task] = _check_task_cls_param_requirements( + task, task_info_by_instance[task.instance]) + modified_by_inst_field = {} + for task, new_fields in new_fields_by_task.items(): + for new_field in new_fields: + key = (task.instance, new_field) + modified_by_inst_field.setdefault(key, []).append(task) + conflicts = { + (inst, field): [t.task_type for t in tlist] + for (inst, field), tlist in modified_by_inst_field.items() + if len(tlist) > 1} + if conflicts: + raise exception.TaskFieldsConflict( + "There are fields which will encounter a state " + "conflict for execution '%s' (type '%s') (instance+field) " + "and tasks: %s" % ( + execution.id, execution.type, conflicts)) + + for _task, new_fields in new_fields_by_task.items(): + for new_field in new_fields: + task_info_by_instance[_task.instance].add(new_field) + for task in queued_tasks: + processed_tasks.add(task.id) + tasks_to_process.pop(task.id, None) LOG.debug( - "Successfully checked all tasks for instance '%s' as part of " - "execution '%s' (type '%s') for any state conflicts: %s", - instance, execution.id, execution.type, - [(t.id, t.task_type) for t in instance_tasks]) + "Sanity check wave for execution '%s': %s", + execution.id, + [(t.id, t.task_type, t.instance) for t in queued_tasks]) + LOG.debug( "Successfully checked all tasks for execution '%s' (type '%s') " "for ordering or state conflicts.", @@ -1329,6 +1306,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, network_map, storage_mappings, notes=None, user_scripts=None, clone_disks=True, skip_os_morphing=False): + clustered = len(instances) > 1 supported_scenarios = [ constants.TRANSFER_SCENARIO_REPLICA, constants.TRANSFER_SCENARIO_LIVE_MIGRATION] @@ -1365,6 +1343,7 @@ def create_instances_transfer(self, ctxt, transfer_scenario, transfer.user_scripts = user_scripts or {} transfer.clone_disks = clone_disks transfer.skip_os_morphing = skip_os_morphing + transfer.clustered = clustered self._check_minion_pools_for_action(ctxt, transfer) @@ -1783,6 +1762,7 @@ def deploy_transfer_instances( deployment.user_scripts = user_scripts deployment.clone_disks = clone_disks deployment.skip_os_morphing = skip_os_morphing + deployment.clustered = bool(getattr(transfer, 'clustered', False)) deployment.deployer_id = wait_for_execution deployment.trust_id = trust_id deployment.last_execution_status = init_status @@ -2130,13 +2110,15 @@ def _cancel_tasks_execution( exception_details=( "This task was unscheduled during the cancellation " "of the parent tasks execution.")) + # NOTE: SYNCING covers tasks waiting on a clustered + # barrier; there is no running worker to cancel, so we + # unschedule like PENDING/STARTING. (Peers stuck in + # SYNCING when another instance fails are handled in + # _abort_peer_sync_barrier_tasks_on_error, not here.) elif task.status in ( constants.TASK_STATUS_PENDING, - constants.TASK_STATUS_STARTING): - # any PENDING/STARTING tasks means that they did not have a - # host assigned to them yet, and presuming the host does not - # start executing the task until it marks itself as the runner, - # we can just mark the task as unscheduled: + constants.TASK_STATUS_STARTING, + constants.TASK_STATUS_SYNCING): LOG.debug( "Setting currently '%s' task '%s' to '%s' as part of the " "cancellation of execution '%s'", @@ -2226,6 +2208,30 @@ def _cancel_tasks_execution( "No new tasks were started for execution '%s' following " "state advancement after cancellation.", execution.id) + def _abort_peer_sync_barrier_tasks_on_error( + self, ctxt, execution, errored_task, error_message): + """Mark peer tasks stuck in SYNCING on the same barrier as failed.""" + if errored_task.task_type not in constants.TASK_TYPES_TO_SYNC: + return + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + if not bool(getattr(action, "clustered", False)): + return + execution = db_api.get_tasks_execution(ctxt, execution.id) + for peer in execution.tasks: + if peer.id == errored_task.id: + continue + if peer.status != constants.TASK_STATUS_SYNCING: + continue + if peer.task_type != errored_task.task_type: + continue + db_api.set_task_status( + ctxt, peer.id, constants.TASK_STATUS_ERROR, + exception_details=( + "Aborted: peer task '%s' failed during clustered " + "execution. Original error: %s" % ( + errored_task.id, error_message))) + def _update_reservation_fulfillment_for_execution(self, ctxt, execution): """ Updates the reservation fulfillment status for the parent transfer action of the given execution based on its type. @@ -2827,6 +2833,137 @@ def _update_volumes_info_for_deployment_parent_transfer( self._update_transfer_volumes_info( ctxt, transfer_id, instance, updated_task_info) + def _handle_task_sync_barrier(self, ctxt, task, execution, action): + """Implements a cross-instance sync barrier for clustered actions. + + If the completed task's type is in TASK_TYPES_TO_SYNC and the + action is clustered, the task is set to SYNCING instead of + remaining COMPLETED. Once all instances' tasks of the same type + reach SYNCING, _handle_synced_tasks runs the type-specific sync + logic, then all tasks are set to COMPLETED and their execution + states are advanced. + + Returns True if the barrier was activated (caller should return + early), False otherwise. + """ + if task.task_type not in constants.TASK_TYPES_TO_SYNC: + return False + if not bool(getattr(action, "clustered", False)): + return False + + db_api.set_task_status( + ctxt, task.id, constants.TASK_STATUS_SYNCING) + + peer_tasks = [ + t for t in execution.tasks + if t.task_type == task.task_type and t.id != task.id] + all_syncing = all( + t.status == constants.TASK_STATUS_SYNCING for t in peer_tasks) + if not all_syncing: + LOG.info( + "Task '%s' (type '%s') for instance '%s' is now SYNCING. " + "Waiting for peer tasks of other instances to reach SYNCING.", + task.id, task.task_type, task.instance) + return True + + LOG.info( + "All tasks of type '%s' across all instances have reached " + "SYNCING for execution '%s'. Running sync handler.", + task.task_type, execution.id) + + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + self._handle_synced_tasks(ctxt, task.task_type, execution, action) + + # NOTE: the sync handler may have updated the action info in the + # DB through separate sessions (detaching the above `action` ORM + # object in the process), so re-fetch it to get both a session-bound + # object and the freshest task info for the post-task actions below: + action = db_api.get_action( + ctxt, execution.action_id, include_task_info=True) + + synced_tasks = [task] + peer_tasks + for synced_task in synced_tasks: + db_api.set_task_status( + ctxt, synced_task.id, constants.TASK_STATUS_COMPLETED) + + for synced_task in synced_tasks: + self._handle_post_task_actions( + ctxt, synced_task, execution, + action.info.get(synced_task.instance, {})) + self._advance_execution_state( + ctxt, execution, instance=synced_task.instance, requery=True) + + return True + + def _assign_clustered_export_disk_owners(self, ctxt, execution, action): + """Assign an ``owner`` to every disk in each instance's export_info. + + The owner of a disk is the first instance (in ``action.instances`` + order) whose ``export_info`` reports that disk ``id``. For regular + (non-shared) disks the owner is the only instance exposing them; + for shared disks (same ``id`` reported by multiple instances), + only the owner's DEPLOY_TRANSFER_DISKS task will create a destination + volume and replicate data into it, while the other instances' tasks + will only record a placeholder ``volumes_info`` entry with + ``replicate_disk_data`` set to False (handled provider-side). + """ + # NOTE: snapshot everything needed from the `action` ORM object + # before any DB writes below: + # update_transfer_action_info_for_instance runs in a separate + # session, which leaves the passed-in `action` object detached from + # its own session, so any lazy attribute access on it afterwards + # would raise a DetachedInstanceError. + action_id = execution.action_id + instances = list(action.instances) + action_info = action.info + + disk_owners_map = {} + for instance_id in instances: + export_info = action_info.get(instance_id, {}).get("export_info") + if not export_info: + LOG.warning( + "No export_info found for instance '%s' of action '%s' " + "while assigning clustered disk owners. Skipping it.", + instance_id, action_id) + continue + disks = export_info.get("devices", {}).get("disks", []) + updated = False + for disk in disks: + disk_id = disk.get("id") + if not disk_id: + continue + if disk_id not in disk_owners_map: + disk_owners_map[disk_id] = instance_id + owner = disk_owners_map[disk_id] + if disk.get("owner") != owner: + disk["owner"] = owner + updated = True + if updated: + db_api.update_transfer_action_info_for_instance( + ctxt, action_id, instance_id, + {"export_info": export_info}) + LOG.info( + "Assigned disk owners in export_info of instance '%s' " + "for clustered action '%s': %s", + instance_id, action_id, { + d.get("id"): d.get("owner") for d in disks}) + + def _handle_synced_tasks(self, ctxt, task_type, execution, action): + """Runs type-specific logic after all instances reach a sync barrier. + + Called once all tasks of a given type across all instances have + reached SYNCING. The handler can inspect and modify action info + (e.g. export_info/volumes_info) for all instances. + """ + if task_type == constants.TASK_TYPE_GET_INSTANCE_INFO: + self._assign_clustered_export_disk_owners(ctxt, execution, action) + elif task_type == constants.TASK_TYPE_SHUTDOWN_INSTANCE: + LOG.info( + "All instances have been shut down for clustered execution " + "'%s'. No additional sync handling required for shutdown.", + execution.id) + def _handle_post_task_actions(self, ctxt, task, execution, task_info): task_type = task.task_type @@ -3143,6 +3280,11 @@ def task_completed(self, ctxt, task_id, task_result): # NOTE: refresh the execution just in case: execution = db_api.get_tasks_execution(ctxt, task.execution_id) + + if self._handle_task_sync_barrier( + ctxt, task, execution, action): + return + self._handle_post_task_actions( ctxt, task, execution, updated_task_info) @@ -3373,8 +3515,12 @@ def set_task_error(self, ctxt, task_id, exception_details): "Some tasks are running in parallel with the " "OSMorphing task, a debug setup cannot be safely " "achieved. Proceeding with cleanup tasks as usual.") + self._abort_peer_sync_barrier_tasks_on_error( + ctxt, execution, task, exception_details) self._cancel_tasks_execution(ctxt, execution) else: + self._abort_peer_sync_barrier_tasks_on_error( + ctxt, execution, task, exception_details) self._cancel_tasks_execution(ctxt, execution) @task_synchronized diff --git a/coriolis/constants.py b/coriolis/constants.py index 2254837c0..5866eed88 100644 --- a/coriolis/constants.py +++ b/coriolis/constants.py @@ -55,11 +55,13 @@ TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK" TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY" TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE" +TASK_STATUS_SYNCING = "SYNCING" ACTIVE_TASK_STATUSES = [ TASK_STATUS_PENDING, TASK_STATUS_STARTING, TASK_STATUS_RUNNING, + TASK_STATUS_SYNCING, TASK_STATUS_CANCELLING, TASK_STATUS_CANCELLING_AFTER_COMPLETION ] @@ -182,6 +184,10 @@ TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION" TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION" +TASK_TYPES_TO_SYNC = [ + TASK_TYPE_GET_INSTANCE_INFO, + TASK_TYPE_SHUTDOWN_INSTANCE, +] MINION_POOL_OPERATIONS_TASKS = [ TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS, @@ -240,6 +246,7 @@ DISK_FORMAT_QCOW2 = 'qcow2' DISK_FORMAT_VHD = 'vhd' DISK_FORMAT_VHDX = 'vhdx' +VOLUME_INFO_REPLICATE_DISK_DATA = "replicate_disk_data" DISK_ALLOCATION_TYPE_STATIC = "static" DISK_ALLOCATION_TYPE_DYNAMIC = "dynamic" diff --git a/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py new file mode 100644 index 000000000..9e834e85d --- /dev/null +++ b/coriolis/db/sqlalchemy/migrate_repo/versions/024_add_clustered_to_base_transfer_action.py @@ -0,0 +1,20 @@ +# Copyright 2026 Cloudbase Solutions Srl +# All Rights Reserved. + +import sqlalchemy + + +def upgrade(migrate_engine): + meta = sqlalchemy.MetaData() + meta.bind = migrate_engine + + base_transfer = sqlalchemy.Table( + 'base_transfer_action', meta, autoload=True) + if 'clustered' in base_transfer.c: + return + # server_default so existing rows get a value when the column is added + # (MySQL stores booleans as TINYINT). + clustered = sqlalchemy.Column( + 'clustered', sqlalchemy.Boolean, nullable=False, + server_default=sqlalchemy.text('0')) + base_transfer.create_column(clustered) diff --git a/coriolis/db/sqlalchemy/models.py b/coriolis/db/sqlalchemy/models.py index d4377999d..c379689a3 100644 --- a/coriolis/db/sqlalchemy/models.py +++ b/coriolis/db/sqlalchemy/models.py @@ -285,6 +285,10 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase, sqlalchemy.Boolean, nullable=False, default=True) skip_os_morphing = sqlalchemy.Column( sqlalchemy.Boolean, nullable=False, default=False) + # Multi-instance transfer: enables cross-instance sync barriers and + # shared-disk handling. Must be set on INSERT (MySQL NOT NULL). + clustered = sqlalchemy.Column( + sqlalchemy.Boolean, nullable=False, default=False) __mapper_args__ = { 'polymorphic_identity': 'base_transfer_action', @@ -320,6 +324,7 @@ def to_dict(self, include_task_info=True, include_executions=True): "user_scripts": self.user_scripts, "clone_disks": self.clone_disks, "skip_os_morphing": self.skip_os_morphing, + "clustered": bool(self.clustered), } if include_executions: for ex in self.executions: diff --git a/coriolis/schemas/disk_sync_resources_info_schema.json b/coriolis/schemas/disk_sync_resources_info_schema.json index 4a86c3752..ffb663090 100644 --- a/coriolis/schemas/disk_sync_resources_info_schema.json +++ b/coriolis/schemas/disk_sync_resources_info_schema.json @@ -15,7 +15,7 @@ }, "volume_dev": { "type": "string", - "description": "String device path (ex: /dev/sdc) from within the temporary minion VM where the disk was attached." + "description": "Guest minion device path (e.g. /dev/disk/by-id/...) when the volume is attached; use \"\" for rows that do not represent a transferred block dev yet (e.g. shared-disk non-owners)." } }, "required": ["disk_id", "volume_dev"], diff --git a/coriolis/schemas/vm_export_info_schema.json b/coriolis/schemas/vm_export_info_schema.json index 05f3017e1..5da966b0c 100644 --- a/coriolis/schemas/vm_export_info_schema.json +++ b/coriolis/schemas/vm_export_info_schema.json @@ -118,9 +118,18 @@ "type": "string", "description": "The allocation scheme for the given disk (static = thick; dynamic = thin)", "enum": ["static", "dynamic"] + }, + "shareable": { + "type": "boolean", + "description": "Whether the disk is shared (multi-writer) and can be attached to multiple VMs simultaneously." + }, + "owner": { + "type": "string", + "description": "The identifier of the instance owning this disk within a clustered (multi-instance) transfer. Set by the conductor after all GET_INSTANCE_INFO tasks have synced. For shared disks, only the owner instance's tasks create/replicate the corresponding destination volume; other instances only reference it." } }, "required": [ + "id", "size_bytes" ] } diff --git a/coriolis/tasks/replica_tasks.py b/coriolis/tasks/replica_tasks.py index 0407e8bca..a6291b607 100644 --- a/coriolis/tasks/replica_tasks.py +++ b/coriolis/tasks/replica_tasks.py @@ -244,6 +244,11 @@ def _run(self, ctxt, instance, origin, destination, task_info, source_environment = task_info['source_environment'] source_resources = task_info.get('source_resources', {}) + # NOTE: the full volumes_info list is passed to the provider, + # including any entries with 'replicate_disk_data' set to False + # (e.g. shared disks of clustered transfers whose data is + # replicated by their owner instance's task). It is up to each + # provider to skip replicating data for such volumes. volumes_info = provider.replicate_disks( ctxt, connection_info, source_environment, instance, source_resources, migr_source_conn_info, migr_target_conn_info, @@ -290,10 +295,9 @@ def _run(self, ctxt, instance, origin, destination, task_info, event_handler) connection_info = base.get_connection_info(ctxt, destination) - volumes_info = task_info.get("volumes_info", []) volumes_info = provider.deploy_replica_disks( ctxt, connection_info, target_environment, instance, export_info, - volumes_info) + task_info.get("volumes_info", [])) schemas.validate_value( volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA) diff --git a/coriolis/tests/conductor/rpc/test_server.py b/coriolis/tests/conductor/rpc/test_server.py index 57d7911fd..34e32b817 100644 --- a/coriolis/tests/conductor/rpc/test_server.py +++ b/coriolis/tests/conductor/rpc/test_server.py @@ -1307,6 +1307,7 @@ def call_execute_transfer_tasks(): if has_origin_minion_pool else None, destination_minion_pool_id=mock.sentinel.destination_minion_pool_id if has_target_minion_pool else None, + clustered=False, ) mock_get_transfer.return_value = mock_transfer diff --git a/coriolis/tests/db/sqlalchemy/test_models.py b/coriolis/tests/db/sqlalchemy/test_models.py index 7b0c7610a..3bd4c7619 100644 --- a/coriolis/tests/db/sqlalchemy/test_models.py +++ b/coriolis/tests/db/sqlalchemy/test_models.py @@ -283,6 +283,7 @@ def test_to_dict(self): transfer.info = mock.sentinel.info transfer.clone_disks = True transfer.skip_os_morphing = False + transfer.clustered = False expected_result = { "base_id": mock.sentinel.base_id, "user_id": mock.sentinel.user_id, @@ -314,6 +315,7 @@ def test_to_dict(self): "info": mock.sentinel.info, "clone_disks": True, "skip_os_morphing": False, + "clustered": False, } result = transfer.to_dict() diff --git a/coriolis/tests/tasks/test_replica_tasks.py b/coriolis/tests/tasks/test_replica_tasks.py index f31e04bec..40fa90291 100644 --- a/coriolis/tests/tasks/test_replica_tasks.py +++ b/coriolis/tests/tasks/test_replica_tasks.py @@ -146,6 +146,8 @@ def test__run(self, mock_unmarshal, mock_check_vol_info, mock_get_vol_info, task_info.get.side_effect = [ task_info['incremental'], task_info['source_resources']] prov_fun = mock_get_provider.return_value.replicate_disks + mock_get_vol_info.return_value = [{"disk_id": "disk_id1"}] + prov_fun.return_value = [{"disk_id": "disk_id1"}] expected_result = {"volumes_info": mock_check_vol_info.return_value} expected_validation_calls = [ mock.call.mock_validate_value(