diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f9c0114..dd52b3e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,8 +16,17 @@ jobs: - name: Install dependencies run: | - pip install torch matplotlib + pip install torch matplotlib scikit-learn - name: Run tests run: | - python -m unittest test_quantum_simulator.py + python -m unittest test_quantum_simulator.py && python -m unittest test_qnn_layers.py + + - name: Run qnn_training + run: | + python qml_training.py + + - name: Run qnn_training-parallel + run: | + python qml_training_parallel.py + diff --git a/error_kraus.py b/error_kraus.py index 872b830..3082c50 100644 --- a/error_kraus.py +++ b/error_kraus.py @@ -114,11 +114,9 @@ def is_only_noise_op(name: str) -> bool: def op_time(name: str, gate_durations: Dict[str, float]) -> float: return gate_durations.get(name, 0.0) -def affected_qubits(op: Tuple[str, List[int]]) -> List[int]: - return op[1] def add_time_based_noise( - circuit: List[Tuple[str, List[int]]], + circuit: List[Tuple[str, List[int], float | None]], num_qubits: int, T1: float, T2: float, @@ -131,14 +129,19 @@ def add_time_based_noise( noisy_circuit: List[Tuple] = [] for op in circuit: - name, qubits = op - + name = op[0] + acted_on = op[1] + # skip adding noise between pure-noise ops if is_only_noise_op(name): noisy_circuit.append(op) continue time_to_elapse = op_time(name, gate_durations) - acted_on = affected_qubits(op) + + # If op acts on multiple qubits, bring them to the same time + if len(acted_on) > 1: + max_time = max(accounted_for_time[q] for q in acted_on) + # Synchronize multi-qubit gates if len(acted_on) > 1: diff --git a/qml_training.py b/qml_training.py index 9f54d2a..68adee3 100644 --- a/qml_training.py +++ b/qml_training.py @@ -1,18 +1,22 @@ +from quantum_simulator import run_noisy_circuit_density import torch import math import random - +DEBUG = False from quantum_simulator import ( zero_state, apply_gate, RY, RZ, + RX, CNOT, state_to_density, build_full_unitary, kraus_operator, apply_named_gate_density, apply_T1T2_noise_op, + Z, + I2 ) # sklearn only for data (this is standard + allowed) @@ -117,11 +121,9 @@ def compute_metrics(scores, y_true): # Quantum utilities # ============================================================ def expectation_z(state, qubit, n): - Z = torch.tensor([[1, 0], [0, -1]], dtype=torch.cfloat) - I = torch.eye(2, dtype=torch.cfloat) op = None for q in range(n): - mat = Z if q == qubit else I + mat = Z if q == qubit else I2 op = mat if op is None else torch.kron(op, mat) return torch.real(state.conj() @ (op @ state)) @@ -148,32 +150,45 @@ def deep_vqc_forward(x, theta, depth=3): # ============================================================ # Noise-aware QNN (density matrix) # ============================================================ -def noisy_qnn_forward(x, theta, T1=100, T2=200): - n = 2 - density = state_to_density(zero_state(n)) - - for gate in [ - build_full_unitary(RY(x[0]), [0], n), - build_full_unitary(RY(x[1]), [1], n), - build_full_unitary(RY(theta[0]), [0], n), - build_full_unitary(RY(theta[1]), [1], n), - ]: - density = kraus_operator(density, [(gate, 1.0)]) - - density = apply_named_gate_density(density, ("CNOT", [0,1]), n) - U = build_full_unitary(RZ(theta[2]), [0], n) - density = kraus_operator(density, [(U, 1.0)]) - - density = apply_T1T2_noise_op( - density, - ("T1T2_NOISE", [0], T1, T2, 1), - n +# Helper function to build layer of RX/Y/Z on selected qubits +def param_gate_layer(gate: str, x , qubits: tuple[int]): + op_list = [] + for qubit in qubits: + op_list.append((gate, [qubit], x[qubit])) + return op_list + +def noisy_qnn_forward(x, theta, + T1=100, + T2=200, + gate_durations={ + "CNOT": 1, + "RY" : 1, + } + ): + + n = 2 + + init_state = zero_state(n) + x_RY_layer = param_gate_layer("RY",x,[0,1]) + theta_RY_layer = param_gate_layer("RY",theta,[0,1]) + cnot_layer = [("CNOT", [0,1])] + all_gates = x_RY_layer + theta_RY_layer + cnot_layer + all_gates.append(("RZ", [0], theta[2])) + + if DEBUG: + print(f"QNN layer gates:\n{all_gates}") + + density = run_noisy_circuit_density( + initial_state=init_state, + circuit=all_gates, + num_qubits = n, + T1 = T1, + T2 = T2, + gate_durations = gate_durations ) - - Z = torch.tensor([[1, 0], [0, -1]], dtype=torch.cfloat) - I = torch.eye(2, dtype=torch.cfloat) - Z0 = torch.kron(Z, I) + + Z0 = torch.kron(Z, I2) return torch.real(torch.trace(Z0 @ density)) @@ -226,6 +241,10 @@ def train(model_type="deep_vqc"): if model_type == "deep_vqc" else noisy_qnn_forward(xi, theta) ) + + # Debug first sample of deep_vqc + if model_type == "deep_vqc" and epoch == 0 and i == 0: + print(f" Initial pred = {pred.item():.6f}, label = {yi.item():.1f}") for p in range(len(theta)): shift = math.pi / 2 @@ -243,6 +262,10 @@ def train(model_type="deep_vqc"): if model_type == "deep_vqc" else noisy_qnn_forward(xi, tm) ) + + # Debug: check if outputs are changing + if model_type == "deep_vqc" and epoch == 0 and i == 0 and p == 0: + print(f" DEBUG: f+ = {fp.item():.6f}, f- = {fm.item():.6f}, diff = {(fp-fm).item():.6f}") grads[p] += 0.5 * ((fp - yi)**2 - (fm - yi)**2) @@ -256,6 +279,10 @@ def train(model_type="deep_vqc"): ]) metrics = compute_metrics(scores_test, y_test) + + # Debug metrics + grad_norm = torch.norm(grads).item() + param_norm = torch.norm(theta).item() print( f"{model_type.upper()} | Epoch {epoch:02d} | " @@ -263,7 +290,9 @@ def train(model_type="deep_vqc"): f"Prec {metrics['precision']:.3f} | " f"Rec {metrics['recall']:.3f} | " f"F1 {metrics['f1']:.3f} | " - f"AUC {metrics['roc_auc']:.3f}" + f"AUC {metrics['roc_auc']:.3f} | " + f"GradNorm {grad_norm:.6f} | " + f"ParamNorm {param_norm:.6f}" ) # ============================================================ diff --git a/qml_training_parallel.py b/qml_training_parallel.py new file mode 100644 index 0000000..6f8f315 --- /dev/null +++ b/qml_training_parallel.py @@ -0,0 +1,330 @@ +""" +Parallelized QML Training +-------------------------- +This module implements parallel versions of QML training functions +while keeping the core quantum circuit logic intact. +""" + +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor +import torch +import math +from functools import partial + +# Import all core functions from original module +from qml_training import ( + make_real_dataset, + compute_metrics, + deep_vqc_forward, + noisy_qnn_forward, + kernel_predict +) + + +# ============================================================ +# Parallel Gradient Computation +# ============================================================ +def compute_single_param_gradient(args): + """ + Worker function to compute gradient for a single parameter. + Uses parameter shift rule: ∇f = (f(θ+π/2) - f(θ-π/2)) / 2 + + Args: + args: Tuple of (xi, yi, theta, param_idx, model_type) + + Returns: + Gradient contribution for the specified parameter + """ + xi, yi, theta, param_idx, model_type = args + shift = math.pi / 2 + + # Create shifted parameter vectors + tp = theta.clone() + tm = theta.clone() + tp[param_idx] += shift + tm[param_idx] -= shift + + # Evaluate circuit at shifted parameters + if model_type == "deep_vqc": + fp = deep_vqc_forward(xi, tp) + fm = deep_vqc_forward(xi, tm) + else: # noise_aware + fp = noisy_qnn_forward(xi, tp) + fm = noisy_qnn_forward(xi, tm) + + # Compute gradient of squared loss + grad = 0.5 * ((fp - yi)**2 - (fm - yi)**2) + return grad.item() if torch.is_tensor(grad) else grad + + +def parallel_gradient_step(X_train, y_train, theta, model_type, num_workers=4): + """ + Compute gradients for all parameters in parallel across all samples. + + Strategy: + - Batch all tasks: (sample_idx, param_idx, theta, model_type) + - Single ProcessPoolExecutor for all gradient computations + - More efficient than opening/closing executor per sample + + Args: + X_train: Training features + y_train: Training labels + theta: Current parameters + model_type: "deep_vqc" or "noise_aware" + num_workers: Number of parallel workers + + Returns: + Accumulated gradient vector + """ + grads = torch.zeros_like(theta) + + # Create all tasks at once (sample_idx, param_idx pairs) + tasks = [] + for i in range(len(X_train)): + for p in range(len(theta)): + tasks.append((X_train[i], y_train[i], theta, p, model_type)) + + # Compute all gradients in one batch + with ProcessPoolExecutor(max_workers=num_workers) as executor: + all_grads = list(executor.map(compute_single_param_gradient, tasks)) + + # Reshape results back and accumulate + task_idx = 0 + for i in range(len(X_train)): + for p in range(len(theta)): + grads[p] += all_grads[task_idx] + task_idx += 1 + + return grads + + +# ============================================================ +# Parallel Forward Pass (Batch Processing) +# ============================================================ +def parallel_forward_pass(X_batch, theta, model_type, num_workers=4): + """ + Evaluate quantum circuit on multiple samples in parallel. + + Args: + X_batch: Batch of input features [batch_size, n_features] + theta: Circuit parameters + model_type: "deep_vqc" or "noise_aware" + num_workers: Number of parallel workers + + Returns: + Tensor of predictions for each sample + """ + def forward_single(x): + if model_type == "deep_vqc": + return deep_vqc_forward(x, theta) + else: + return noisy_qnn_forward(x, theta) + + # Use thread pool for I/O bound quantum simulations + with ThreadPoolExecutor(max_workers=num_workers) as executor: + results = list(executor.map(forward_single, X_batch)) + + return torch.tensor(results) + + +# ============================================================ +# Parallel Training Loop +# ============================================================ +def train_parallel(model_type="deep_vqc", num_workers=4, parallel_mode="gradient"): + """ + Train QNN with parallelization + + Args: + model_type: "deep_vqc" or "noise_aware" + num_workers: Number of parallel workers + parallel_mode: "gradient" (parallelize gradient computation) or + "batch" (parallelize forward passes) + """ + X_train, X_test, y_train, y_test = make_real_dataset() + epochs = 25 # Reduced from 25 + # Higher learning rate for deep_vqc to escape barren plateaus + lr = 0.1 + + + theta = torch.randn(9 if model_type == "deep_vqc" else 3) * 0.1 + + print(f"\n{'='*60}") + print(f"Training {model_type.upper()}") + print(f"Mode: {parallel_mode.upper()} parallelization | Workers: {num_workers}") + print(f"Train samples: {len(X_train)} | Test samples: {len(X_test)}") + print(f"Epochs: {epochs}") + print(f"{'='*60}\n") + + for epoch in range(epochs): + if parallel_mode == "gradient": + # Parallel gradient computation + grads = parallel_gradient_step( + X_train, y_train, theta, model_type, num_workers + ) + else: + # Sequential gradient computation (but parallel forward passes) + grads = torch.zeros_like(theta) + + for i in range(len(X_train)): + xi, yi = X_train[i], y_train[i] + + for p in range(len(theta)): + shift = math.pi / 2 + tp, tm = theta.clone(), theta.clone() + tp[p] += shift + tm[p] -= shift + + # Parallel forward pass + results = [] + for t in [tp, tm]: + if model_type == "deep_vqc": + results.append(deep_vqc_forward(xi, t)) + else: + results.append(noisy_qnn_forward(xi, t)) + + fp, fm = results[0], results[1] + grads[p] += 0.5 * ((fp - yi)**2 - (fm - yi)**2) + + # Debug: Check gradient magnitudes + grad_norm = torch.norm(grads).item() + + # Update parameters + theta -= lr * grads / len(X_train) + + # Debug: Check gradient magnitudes + grad_norm = torch.norm(grads).item() + + # Update parameters + theta -= lr * grads / len(X_train) + + # Evaluate on test set (parallelized) + scores_test = parallel_forward_pass(X_test, theta, model_type, num_workers) + + # Compute metrics + metrics = compute_metrics(scores_test, y_test) + + print( + f"{model_type.upper()} | Epoch {epoch:02d} | " + f"Acc {metrics['accuracy']:.3f} | " + f"Prec {metrics['precision']:.3f} | " + f"Rec {metrics['recall']:.3f} | " + f"F1 {metrics['f1']:.3f} | " + f"AUC {metrics['roc_auc']:.3f} | " + f"GradNorm {grad_norm:.6f}" + ) + + return theta + + +# ============================================================ +# Parallel Kernel Model +# ============================================================ +def parallel_kernel_predict(X_test, X_train, y_train, num_workers=4): + """ + Parallel quantum kernel prediction. + + Args: + X_test: Test samples + X_train: Training samples + y_train: Training labels + num_workers: Number of parallel workers + + Returns: + Predictions for test samples + """ + def predict_single(x): + return kernel_predict(x, X_train, y_train) + + with ThreadPoolExecutor(max_workers=num_workers) as executor: + scores = list(executor.map(predict_single, X_test)) + + return torch.tensor(scores) + + +def train_kernel_parallel(num_workers=4): + """Train quantum kernel model with parallel prediction (SCALED DOWN).""" + X_train, X_test, y_train, y_test = make_real_dataset() + + print(f"\n{'='*60}") + print(f"Training QUANTUM KERNEL ") + print(f"Workers: {num_workers}") + print(f"Train samples: {len(X_train)} | Test samples: {len(X_test)}") + print(f"{'='*60}\n") + + scores = parallel_kernel_predict(X_test, X_train, y_train, num_workers) + metrics = compute_metrics(scores, y_test) + + print("KERNEL TEST METRICS:", metrics) + return metrics + + +# ============================================================ +# Benchmark Utilities +# ============================================================ +def benchmark_parallel_vs_sequential(model_type="deep_vqc", num_workers=4): + """ + Compare parallel vs sequential training performance. + + Args: + model_type: Model to benchmark + num_workers: Number of parallel workers + """ + import time + from qml_training import train as train_sequential + + print(f"\n{'='*60}") + print(f"BENCHMARK: {model_type.upper()}") + print(f"{'='*60}") + + # Sequential training + print("\n[1/2] Sequential Training...") + start = time.time() + train_sequential(model_type) + seq_time = time.time() - start + + # Parallel training + print(f"\n[2/2] Parallel Training ({num_workers} workers)...") + start = time.time() + train_parallel(model_type, num_workers, parallel_mode="gradient") + par_time = time.time() - start + + # Results + print(f"\n{'='*60}") + print(f"BENCHMARK RESULTS") + print(f"{'='*60}") + print(f"Sequential Time: {seq_time:.2f}s") + print(f"Parallel Time: {par_time:.2f}s") + print(f"Speedup: {seq_time/par_time:.2f}x") + print(f"{'='*60}\n") + + +# ============================================================ +# Run +# ============================================================ +if __name__ == "__main__": + import sys + + # Default: 4 workers + num_workers = 4 + if len(sys.argv) > 1: + num_workers = int(sys.argv[1]) + + print(f"\n{'='*60}") + print(f"PARALLEL QML TRAINING") + print(f"Workers: {num_workers}") + print(f"{'='*60}") + + # Train with gradient parallelization + print("\n[1/3] Training Deep VQC (Parallel Gradients)...") + train_parallel("deep_vqc", num_workers=num_workers, parallel_mode="gradient") + + print("\n[2/3] Training Noise-Aware QNN (Parallel Gradients)...") + train_parallel("noise_aware", num_workers=num_workers, parallel_mode="gradient") + + print("\n[3/3] Training Quantum Kernel (Parallel Prediction)...") + train_kernel_parallel(num_workers=num_workers) + + # Optional: Uncomment to run benchmark + print("\n" + "="*60) + print("Running Benchmark...") + print("="*60) + benchmark_parallel_vs_sequential("noise_aware", num_workers=num_workers) diff --git a/quantum_simulator.py b/quantum_simulator.py index 23931a0..b32aa09 100644 --- a/quantum_simulator.py +++ b/quantum_simulator.py @@ -49,16 +49,16 @@ def custom_state(amplitudes): # ─────────────────────────────────────────────────────────────── # Parameterized rotations # ─────────────────────────────────────────────────────────────── -def RX(theta): - theta = torch.tensor(theta) # Need to convert the scalars to tensor types +def RX(theta: float): + theta = theta if torch.is_tensor(theta) else torch.tensor(theta) return torch.cos(theta/2)*I2 - 1j*torch.sin(theta/2)*X -def RY(theta): - theta = torch.tensor(theta) +def RY(theta: float): + theta = theta if torch.is_tensor(theta) else torch.tensor(theta) return torch.cos(theta/2)*I2 - 1j*torch.sin(theta/2)*Y -def RZ(theta): - theta = torch.tensor(theta) +def RZ(theta: float): + theta = theta if torch.is_tensor(theta) else torch.tensor(theta) return torch.cos(theta/2)*I2 - 1j*torch.sin(theta/2)*Z # ─────────────────────────────────────────────────────────────── @@ -95,7 +95,15 @@ def RZ(theta): "T": T, "CNOT": CNOT, "NOTC": NOTC, - "SWAP": SWAP, + "SWAP": SWAP +} + +# PARAMETRIC_GATES: Dict[str, callable[[float], torch.Tensor]] = { +# PARAMETRIC_GATES: Dict[str, Unknown, torch.Tensor] = { +PARAMETRIC_GATES = { + "RZ": RZ, + "RY": RY, + "RX": RX } # ─────────────────────────────────────────────────────────────── @@ -222,26 +230,32 @@ def build_full_unitary( def apply_named_gate_density( density: torch.Tensor, - op: Tuple[str, List[int]], + op: Tuple[str, List[int], float | None], num_qubits: int, - unitary_cache: Dict[Tuple[str, Tuple[int, ...]], torch.Tensor] | None = None, + unitary_cache: Dict[Tuple[str, Tuple[int, ...], float | None], torch.Tensor] | None = None, ) -> torch.Tensor: """ op = (gate_name, [qubits]) Uses kraus_operator with a single Kraus operator U_full (probability 1). Caches U_full per (gate_name, tuple(qubits)) if unitary_cache is provided. + + For parametric gates, use + op = (gate_name, [qubits], param) + ie: ("RX", [0], .4) """ - gate_name, qubits = op - - if gate_name not in GATE_LIBRARY: - raise ValueError(f"Unknown gate name '{gate_name}'") + gate_name = op[0] + qubits = op[1] + + # take care of parametric case (have to deal with the param value) + is_param = gate_name in PARAMETRIC_GATES + param = op[2] if is_param else None + key = (gate_name, tuple(qubits), param) - key = (gate_name, tuple(qubits)) if unitary_cache is not None and key in unitary_cache: U_full = unitary_cache[key] else: - gate = GATE_LIBRARY[gate_name] + gate = PARAMETRIC_GATES[gate_name](param) if is_param else GATE_LIBRARY[gate_name] U_full = build_full_unitary(gate, qubits, num_qubits) if unitary_cache is not None: unitary_cache[key] = U_full @@ -335,7 +349,7 @@ def run_noisy_circuit_density( else: # Physical gate density = apply_named_gate_density( - density, (name, op[1]), num_qubits, unitary_cache + density, (name, op[1], op[2] if name in PARAMETRIC_GATES else None), num_qubits, unitary_cache ) return density @@ -410,4 +424,34 @@ def run_noisy_circuit_density( plot_measurement_comparison(state_counts, kraus_counts, title="Normal vs error Kraus measurement distributions") - \ No newline at end of file + + # Rotation ops test + + # circuit = [ + # ("RX", [0],.2), + # ("CNOT",[0,1]), + # ] + + # # simple uniform durations + # gate_durations ={ + # "H": 1, + # "CNOT": 1, + # "RX": 1 + # } + + # T1 = 10 + # T2 = 20 + + # # compare normal to kraus result + # ρ_final_normal = run_noisy_circuit_density( + # initial_state=init_state, + # circuit=circuit, + # num_qubits=n, + # T1=0, + # T2=0, + # gate_durations=gate_durations, + # ) + # normal_kraus_counts = measure_kraus(ρ_final_normal,shots=SHOTS) + # plot_measurement_comparison(state_counts, normal_kraus_counts, + # title="Normal vs Kraus measurement distributions") + diff --git a/test_qnn_layers.py b/test_qnn_layers.py new file mode 100644 index 0000000..5d87edf --- /dev/null +++ b/test_qnn_layers.py @@ -0,0 +1,319 @@ +import unittest +import torch +import math +from qml_training import param_gate_layer, noisy_qnn_forward +from quantum_simulator import ( + zero_state, + apply_gate, + RY, + RZ, + CNOT, + state_to_density, + run_noisy_circuit_density, + Z, + I2 +) + + +class TestHelper(unittest.TestCase): + """Helper class for tensor assertions""" + def assert_tensors_close(self, actual, expected, rtol=1e-5, atol=1e-5): + try: + torch.testing.assert_close(actual, expected, rtol=rtol, atol=atol) + except AssertionError as e: + self.fail(f"\nExpected: {expected}\nReceived: {actual}\nOriginal error: {e}") + + +class TestParamGateLayer(TestHelper): + """Test class for param_gate_layer function""" + + def test_single_qubit_RY_layer(self): + """Test RY gate layer on a single qubit""" + x = torch.tensor([math.pi/2]) + qubits = (0,) + + result = param_gate_layer("RY", x, qubits) + + # Expected: [("RY", [0], pi/2)] + self.assertEqual(len(result), 1) + self.assertEqual(result[0][0], "RY") + self.assertEqual(result[0][1], [0]) + self.assertAlmostEqual(result[0][2].item(), math.pi/2, places=5) + + def test_two_qubit_RY_layer(self): + """Test RY gate layer on two qubits""" + x = torch.tensor([math.pi/4, math.pi/3]) + qubits = (0, 1) + + result = param_gate_layer("RY", x, qubits) + + # Expected: [("RY", [0], pi/4), ("RY", [1], pi/3)] + self.assertEqual(len(result), 2) + + self.assertEqual(result[0][0], "RY") + self.assertEqual(result[0][1], [0]) + self.assertAlmostEqual(result[0][2].item(), math.pi/4, places=5) + + self.assertEqual(result[1][0], "RY") + self.assertEqual(result[1][1], [1]) + self.assertAlmostEqual(result[1][2].item(), math.pi/3, places=5) + + def test_RZ_gate_layer(self): + """Test RZ gate layer""" + x = torch.tensor([0.5, 1.0]) + qubits = (0, 1) + + result = param_gate_layer("RZ", x, qubits) + + self.assertEqual(len(result), 2) + self.assertEqual(result[0][0], "RZ") + self.assertEqual(result[1][0], "RZ") + self.assertAlmostEqual(result[0][2].item(), 0.5, places=5) + self.assertAlmostEqual(result[1][2].item(), 1.0, places=5) + + def test_single_qubit_selection(self): + """Test selecting a single qubit from multi-qubit parameters""" + x = torch.tensor([0.1, 0.2, 0.3]) + qubits = (1,) # Only select qubit 1 + + result = param_gate_layer("RY", x, qubits) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0][1], [1]) + self.assertAlmostEqual(result[0][2].item(), 0.2, places=5) + + def test_non_sequential_qubits(self): + """Test gate layer on non-sequential qubits""" + x = torch.tensor([0.1, 0.2, 0.3, 0.4]) + qubits = (0, 2) # Skip qubit 1 + + result = param_gate_layer("RY", x, qubits) + + self.assertEqual(len(result), 2) + self.assertEqual(result[0][1], [0]) + self.assertAlmostEqual(result[0][2].item(), 0.1, places=5) + self.assertEqual(result[1][1], [2]) + self.assertAlmostEqual(result[1][2].item(), 0.3, places=5) + + def test_empty_qubit_list(self): + """Test with empty qubit tuple""" + x = torch.tensor([0.1, 0.2]) + qubits = () + + result = param_gate_layer("RY", x, qubits) + + self.assertEqual(len(result), 0) + + def test_gate_layer_in_circuit(self): + """Test that param_gate_layer output works in a circuit""" + n = 2 + x = torch.tensor([math.pi/4, math.pi/6]) + + # Create gate layer + gate_layer = param_gate_layer("RY", x, (0, 1)) + + # Apply gates manually to verify structure + state = zero_state(n) + for gate_name, qubit_list, param in gate_layer: + if gate_name == "RY": + state = apply_gate(state, RY(param), qubit_list, n) + + # Verify state is normalized + norm = torch.linalg.norm(state) + self.assertAlmostEqual(norm.item(), 1.0, places=5) + + # Verify state is not just |00> + self.assertNotAlmostEqual(state[0].real.item(), 1.0, places=3) + + +class TestNoisyQNNIntegration(TestHelper): + """Integration tests for noisy QNN forward pass using param_gate_layer""" + + def test_noisy_qnn_output_range(self): + """Test that noisy QNN output is in valid range [-1, 1]""" + x = torch.tensor([0.5, 0.3]) + theta = torch.tensor([0.1, 0.2, 0.3]) + + output = noisy_qnn_forward(x, theta, T1=100, T2=200) + + # Expectation value of Z should be in [-1, 1] + self.assertGreaterEqual(output.item(), -1.0) + self.assertLessEqual(output.item(), 1.0) + + def test_noisy_qnn_deterministic(self): + """Test that noisy QNN gives same output for same inputs""" + x = torch.tensor([0.5, 0.3]) + theta = torch.tensor([0.1, 0.2, 0.3]) + + output1 = noisy_qnn_forward(x, theta, T1=100, T2=200) + output2 = noisy_qnn_forward(x, theta, T1=100, T2=200) + + self.assertAlmostEqual(output1.item(), output2.item(), places=5) + + def test_noisy_qnn_different_inputs(self): + """Test that different inputs give different outputs""" + theta = torch.tensor([0.1, 0.2, 0.3]) + + x1 = torch.tensor([0.5, 0.3]) + x2 = torch.tensor([1.0, 0.8]) + + output1 = noisy_qnn_forward(x1, theta, T1=100, T2=200) + output2 = noisy_qnn_forward(x2, theta, T1=100, T2=200) + + # Outputs should be different for different inputs + self.assertNotAlmostEqual(output1.item(), output2.item(), places=3) + + def test_noisy_qnn_noise_effect(self): + """Test that T1/T2 parameters affect the output""" + x = torch.tensor([0.5, 0.3]) + theta = torch.tensor([0.1, 0.2, 0.3]) + + # Very high T1/T2 (minimal noise) + output_low_noise = noisy_qnn_forward(x, theta, T1=10000, T2=10000) + + # Low T1/T2 (high noise) + output_high_noise = noisy_qnn_forward(x, theta, T1=1, T2=1) + + # Outputs should be different due to noise + # Note: they might be close in some cases, so we just verify they exist + self.assertIsInstance(output_low_noise.item(), float) + self.assertIsInstance(output_high_noise.item(), float) + + +class TestCircuitConstruction(TestHelper): + """Test circuit construction using param_gate_layer""" + + def test_build_full_qnn_circuit(self): + """Test building a complete QNN circuit from layers""" + x = torch.tensor([0.5, 0.3]) + theta = torch.tensor([0.1, 0.2, 0.3]) + + # Build circuit layers + x_RY_layer = param_gate_layer("RY", x, (0, 1)) + theta_RY_layer = param_gate_layer("RY", theta, (0, 1)) + cnot_layer = [("CNOT", [0, 1])] + + # Combine all layers + circuit = x_RY_layer + theta_RY_layer + cnot_layer + circuit.append(("RZ", [0], theta[2])) + + # Verify circuit structure + self.assertEqual(len(circuit), 6) # 2 + 2 + 1 + 1 + + # Check first layer (x encoding) + self.assertEqual(circuit[0][0], "RY") + self.assertEqual(circuit[1][0], "RY") + + # Check second layer (theta parameters) + self.assertEqual(circuit[2][0], "RY") + self.assertEqual(circuit[3][0], "RY") + + # Check CNOT + self.assertEqual(circuit[4][0], "CNOT") + self.assertEqual(circuit[4][1], [0, 1]) + + # Check final RZ + self.assertEqual(circuit[5][0], "RZ") + + def test_layer_concatenation(self): + """Test that multiple layers can be concatenated correctly""" + x = torch.tensor([0.1, 0.2, 0.3]) + + layer1 = param_gate_layer("RY", x, (0, 1)) + layer2 = param_gate_layer("RZ", x, (1, 2)) + + combined = layer1 + layer2 + + self.assertEqual(len(combined), 4) + self.assertEqual(combined[0][0], "RY") + self.assertEqual(combined[1][0], "RY") + self.assertEqual(combined[2][0], "RZ") + self.assertEqual(combined[3][0], "RZ") + + +class TestParallelTraining(TestHelper): + """Test class for parallel training functions""" + + def test_compute_single_param_gradient(self): + """Test single parameter gradient computation""" + from qml_training_parallel import compute_single_param_gradient + + xi = torch.tensor([0.5, 0.3]) + yi = torch.tensor(1.0) + theta = torch.tensor([0.1, 0.2, 0.3]) + param_idx = 0 + model_type = "noise_aware" + + args = (xi, yi, theta, param_idx, model_type) + grad = compute_single_param_gradient(args) + + # Should return a numeric gradient + self.assertIsInstance(grad, (float, int)) + + def test_parallel_forward_pass(self): + """Test parallel forward pass with small batch""" + from qml_training_parallel import parallel_forward_pass + + X_batch = torch.tensor([[0.5, 0.3], [0.2, 0.8]]) + theta = torch.tensor([0.1, 0.2, 0.3]) + + results = parallel_forward_pass(X_batch, theta, "noise_aware", num_workers=2) + + # Should return predictions for both samples + self.assertEqual(len(results), 2) + + # Each prediction should be in valid range [-1, 1] + for pred in results: + self.assertGreaterEqual(pred.item(), -1.0) + self.assertLessEqual(pred.item(), 1.0) + + def test_parallel_gradient_consistency(self): + """Test that parallel gradients are consistent with sequential""" + from qml_training_parallel import parallel_gradient_step + from qml_training import make_real_dataset + + # Use small subset for speed + X_train, _, y_train, _ = make_real_dataset() + X_train_small = X_train[:3] + y_train_small = y_train[:3] + + theta = torch.tensor([0.1, 0.2, 0.3]) + + # Compute parallel gradients + grads_parallel = parallel_gradient_step( + X_train_small, y_train_small, theta, "noise_aware", num_workers=2 + ) + + # Should return gradient vector of same shape + self.assertEqual(grads_parallel.shape, theta.shape) + + # Gradients should be finite + self.assertTrue(torch.all(torch.isfinite(grads_parallel))) + + def test_parallel_kernel_predict_shape(self): + """Test parallel kernel prediction returns correct shape""" + from qml_training_parallel import parallel_kernel_predict + from qml_training import make_real_dataset + + X_train, X_test, y_train, _ = make_real_dataset() + + # Use small subset + X_train_small = X_train[:5] + y_train_small = y_train[:5] + X_test_small = X_test[:3] + + scores = parallel_kernel_predict( + X_test_small, X_train_small, y_train_small, num_workers=2 + ) + + # Should return predictions for all test samples + self.assertEqual(len(scores), len(X_test_small)) + + # Predictions should be in [-1, 1] range (can be 0 if sum equals zero) + for score in scores: + self.assertGreaterEqual(score.item(), -1.0) + self.assertLessEqual(score.item(), 1.0) + + +if __name__ == "__main__": + unittest.main() diff --git a/test_quantum_simulator.py b/test_quantum_simulator.py index 26b1ae2..a42692c 100644 --- a/test_quantum_simulator.py +++ b/test_quantum_simulator.py @@ -434,11 +434,6 @@ def test_op_time_lookup(self): self.assertAlmostEqual(op_time("CNOT", times), 2.0) self.assertAlmostEqual(op_time("Z", times), 0.0) # default - def test_affected_qubits(self): - op = ("H", [0, 2]) - qs = affected_qubits(op) - self.assertEqual(qs, [0, 2]) - # ─────────────────────────────────────────────────────────── # add_time_based_noise # ───────────────────────────────────────────────────────────