Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions src/variopt/algorithms/population/species_ga/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,12 +507,13 @@ def _build_next_population(
offspring: tuple[SpeciesGAPopulationMember[CandidateT], ...],
) -> tuple[SpeciesGAPopulationMember[CandidateT], ...]:
candidate_pool = self._sort_population(parents + offspring)
species_members: list[list[SpeciesGAPopulationMember[CandidateT]]] = []
overflow_members: list[SpeciesGAPopulationMember[CandidateT]] = []
for member in candidate_pool:
species_members: list[
list[tuple[int, SpeciesGAPopulationMember[CandidateT]]]
] = []
for member_index, member in enumerate(candidate_pool):
assigned = False
for members in species_members:
seed_member = members[0]
seed_member = members[0][1]
if (
self.diversity_metric.distance(
member.candidate,
Expand All @@ -521,28 +522,37 @@ def _build_next_population(
< self.resolved_profile.species_radius
):
if len(members) < self.resolved_profile.species_capacity:
members.append(member)
else:
overflow_members.append(member)
members.append((member_index, member))
assigned = True
break
if assigned:
continue

species_members.append([member])
species_members.append([(member_index, member)])

protected_population = tuple(
member
protected_entries = tuple(
(member_index, species_member_index == 0, member)
for members in species_members
for member in members
for species_member_index, (member_index, member) in enumerate(members)
)
if len(protected_population) >= self.population_size:
return self._sort_population(protected_population[: self.population_size])
if len(protected_entries) >= self.population_size:
seed_prioritized_population = tuple(
member
for _, _, member in sorted(
protected_entries,
key=lambda entry: (not entry[1], entry[2].score),
)[: self.population_size]
)
return self._sort_population(seed_prioritized_population)

protected_indices = frozenset(
member_index for member_index, _, _ in protected_entries
)
protected_population = tuple(member for _, _, member in protected_entries)
backfill_members = tuple(
member
for member in candidate_pool
if member not in protected_population
for member_index, member in enumerate(candidate_pool)
if member_index not in protected_indices
)
next_population = self._sort_population(
protected_population
Expand Down
223 changes: 222 additions & 1 deletion tests/population/test_species_ga.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
PermutationSpace,
Problem,
Proposal,
SearchSpace,
)
from variopt.algorithms import (
SpeciesConservingGeneticAlgorithmOptimizer,
SpeciesGAProfile,
)
from variopt.algorithms.population.species_ga.state import (
SpeciesGAPopulationMember,
)
from variopt.diversity import DiversityMetric
from variopt.evaluators import SequentialEvaluator
from variopt.operators import VariationOperator
Expand Down Expand Up @@ -101,6 +105,103 @@ def distance(self, left: int, right: int) -> float:
return float(abs(left - right))


class EqualityHostileCandidate:
"""Candidate that fails the test if scalar equality is used."""

value: int

def __init__(self, value: int) -> None:
self.value = value

@override
def __eq__(self, other: object) -> bool:
_ = other
raise AssertionError("species survival must not compare candidates by value")


class EqualityHostileSpace(
SearchSpace[EqualityHostileCandidate, EqualityHostileCandidate],
):
"""Minimal search space for equality-hostile candidate fixtures."""

@override
def normalize(
self,
raw_candidate: EqualityHostileCandidate,
) -> EqualityHostileCandidate:
self.validate(raw_candidate)
return raw_candidate

@override
def validate(self, candidate: EqualityHostileCandidate) -> None:
if type(candidate) is not EqualityHostileCandidate:
msg = "candidate must be an EqualityHostileCandidate"
raise TypeError(msg)

@override
def sample(self, random_state: np.random.RandomState) -> EqualityHostileCandidate:
return EqualityHostileCandidate(int(random_state.randint(0, 2)))


class EqualityHostileDistance(DiversityMetric[EqualityHostileCandidate]):
"""Absolute-difference distance for equality-hostile candidates."""

@override
def distance(
self,
left: EqualityHostileCandidate,
right: EqualityHostileCandidate,
) -> float:
return float(abs(left.value - right.value))


class PreserveEqualityHostileMutation(VariationOperator[EqualityHostileCandidate]):
"""Unary mutation fixture that leaves equality-hostile candidates unchanged."""

@property
@override
def arity(self) -> int:
return 1

@override
def apply(
self,
parents: Sequence[EqualityHostileCandidate],
random_state: np.random.RandomState,
) -> EqualityHostileCandidate:
_ = random_state
return parents[0]


class TestableSpeciesGA(SpeciesConservingGeneticAlgorithmOptimizer[int, int]):
"""Test-only species GA that exposes one survival seam."""

def build_next_population_for_test(
self,
*,
parents: tuple[SpeciesGAPopulationMember[int], ...],
offspring: tuple[SpeciesGAPopulationMember[int], ...],
) -> tuple[SpeciesGAPopulationMember[int], ...]:
return self._build_next_population(parents=parents, offspring=offspring)


class EqualityHostileTestableSpeciesGA(
SpeciesConservingGeneticAlgorithmOptimizer[
EqualityHostileCandidate,
EqualityHostileCandidate,
],
):
"""Test-only species GA for non-scalar equality candidates."""

def build_next_population_for_test(
self,
*,
parents: tuple[SpeciesGAPopulationMember[EqualityHostileCandidate], ...],
offspring: tuple[SpeciesGAPopulationMember[EqualityHostileCandidate], ...],
) -> tuple[SpeciesGAPopulationMember[EqualityHostileCandidate], ...]:
return self._build_next_population(parents=parents, offspring=offspring)


class SpeciesConservingGeneticAlgorithmOptimizerTests:
"""Regression tests for the species-conserving GA optimizer."""

Expand Down Expand Up @@ -172,9 +273,129 @@ def test_species_survival_preserves_multiple_seeds(self) -> None:
state = optimizer.tell(state, tuple(outcome.observation for outcome in outcomes))

assert state.generation_index == 1
assert tuple(member.candidate for member in state.population) == (0, 1, 7, 8)
next_candidates = tuple(member.candidate for member in state.population)
assert next_candidates[0] == 0
assert 7 in next_candidates
assert len(state.population) == 4

def test_species_backfill_uses_pool_indices_for_clone_members(self) -> None:
optimizer = TestableSpeciesGA(
space=IntegerSpace(0, 10),
population_size=4,
diversity_metric=IntegerDistance(),
mutation_operator=StepTowardZeroMutation(),
profile=SpeciesGAProfile(
mutation_probability=0.0,
crossover_probability=0.0,
species_radius=3.0,
species_capacity=1,
),
)
clone_pool = tuple(
SpeciesGAPopulationMember(candidate=0, value=0.0, score=0.0)
for _ in range(8)
)

next_population = optimizer.build_next_population_for_test(
parents=clone_pool[:4],
offspring=clone_pool[4:],
)

assert len(next_population) == 4
assert tuple(member.candidate for member in next_population) == (0, 0, 0, 0)

def test_species_backfill_does_not_use_candidate_value_equality(self) -> None:
optimizer = EqualityHostileTestableSpeciesGA(
space=EqualityHostileSpace(),
population_size=4,
diversity_metric=EqualityHostileDistance(),
mutation_operator=PreserveEqualityHostileMutation(),
profile=SpeciesGAProfile(
mutation_probability=0.0,
crossover_probability=0.0,
species_radius=3.0,
species_capacity=1,
),
)
clone_pool = tuple(
SpeciesGAPopulationMember(
candidate=EqualityHostileCandidate(0),
value=0.0,
score=0.0,
)
for _ in range(8)
)

next_population = optimizer.build_next_population_for_test(
parents=clone_pool[:4],
offspring=clone_pool[4:],
)

assert len(next_population) == 4
assert all(member.candidate.value == 0 for member in next_population)

def test_species_clone_survival_keeps_next_tournament_valid(self) -> None:
optimizer = SpeciesConservingGeneticAlgorithmOptimizer(
space=IntegerSpace(0, 10),
population_size=4,
diversity_metric=IntegerDistance(),
mutation_operator=StepTowardZeroMutation(),
profile=SpeciesGAProfile(
tournament_size=2,
mutation_probability=0.0,
crossover_probability=0.0,
species_radius=3.0,
species_capacity=1,
),
sampler=CyclingIntegerSampler((0, 0, 0, 0)),
random_state=0,
)
problem = Problem(space=IntegerSpace(0, 10), objective=SquareObjective())
evaluator = SequentialEvaluator[int, int]()

state = optimizer.create_initial_state()
proposals, state = optimizer.ask(state, batch_size=4)
outcomes = evaluator.evaluate(problem, _requests(proposals))
state = optimizer.tell(
state, tuple(outcome.observation for outcome in outcomes)
)
assert len(state.population) == 4

proposals, state = optimizer.ask(state, batch_size=4)
outcomes = evaluator.evaluate(problem, _requests(proposals))
state = optimizer.tell(
state, tuple(outcome.observation for outcome in outcomes)
)
assert len(state.population) == 4

proposals, state = optimizer.ask(state, batch_size=4)
assert len(proposals) == 4

def test_species_truncation_prioritizes_seeds_over_crowd_members(self) -> None:
optimizer = TestableSpeciesGA(
space=IntegerSpace(0, 20),
population_size=3,
diversity_metric=IntegerDistance(),
mutation_operator=StepTowardZeroMutation(),
profile=SpeciesGAProfile(
mutation_probability=0.0,
crossover_probability=0.0,
species_radius=3.0,
species_capacity=3,
),
)
pool = tuple(
SpeciesGAPopulationMember(candidate=candidate, value=score, score=score)
for candidate, score in ((0, 0.0), (1, 1.0), (2, 2.0), (10, 10.0))
)

next_population = optimizer.build_next_population_for_test(
parents=pool,
offspring=(),
)

assert tuple(member.candidate for member in next_population) == (0, 1, 10)

def test_from_permutation_space_defaults_can_optimize(self) -> None:
space = PermutationSpace(size=6)
optimizer = SpeciesConservingGeneticAlgorithmOptimizer.from_permutation_space_defaults(
Expand Down