I am using the Optapy library in python, and I am using the school timetabling instance on GitHub as a base. I have few questions regarding the library configurations:
I was looking at OptaPlanner User Guide, but I am not sure how to implement it on python.
Guidance appreciated.
OptaPy can be configured using the programmatic API. The config classes can be found in the optapy.config
package. In particular, you choose the optimisation algorithm via withPhases
:
import optapy.config
solver_config = optapy.config.solver.SolverConfig().withEntityClasses(get_class(Lesson)) \
.withSolutionClass(get_class(TimeTable)) \
.withConstraintProviderClass(get_class(define_constraints)) \
.withTerminationSpentLimit(Duration.ofSeconds(30)) \
.withPhases([
optapy.config.constructionheuristic.ConstructionHeuristicPhaseConfig(),
optapy.config.localsearch.LocalSearchPhaseConfig()
.withAcceptorConfig(optapy.config.localsearch.decider.acceptor.LocalSearchAcceptorConfig()
.withSimulatedAnnealingStartingTemperature("0hard/0soft"))
])
(the above configures simulated annealing).
Recently added was the @easy_score_calculator
and @incremental_score_calculator
decorators, which allows you to define an EasyScoreCalculator or IncrementalScoreCalculator respectively. For example, (EasyScoreCalculator, maximize value):
@optapy.easy_score_calculator
def my_score_calculator(solution: Solution):
total_score = 0
for entity in solution.entity_list:
total_score += 0 if entity.value is None else entity.value
return optapy.score.SimpleScore.of(total_score)
solver_config = optapy.config.solver.SolverConfig()
termination_config = optapy.config.solver.termination.TerminationConfig()
termination_config.setBestScoreLimit('9')
solver_config.withSolutionClass(optapy.get_class(Solution)) \
.withEntityClasses(optapy.get_class(Entity)) \
.withEasyScoreCalculatorClass(optapy.get_class(my_score_calculator)) \
.withTerminationConfig(termination_config)
or with an IncrementalScoreCalculator (NQueens):
@optapy.incremental_score_calculator
class IncrementalScoreCalculator:
score: int
row_index_map: dict
ascending_diagonal_index_map: dict
descending_diagonal_index_map: dict
def resetWorkingSolution(self, working_solution: Solution):
n = working_solution.n
self.row_index_map = dict()
self.ascending_diagonal_index_map = dict()
self.descending_diagonal_index_map = dict()
for i in range(n):
self.row_index_map[i] = list()
self.ascending_diagonal_index_map[i] = list()
self.descending_diagonal_index_map[i] = list()
if i != 0:
self.ascending_diagonal_index_map[n - 1 + i] = list()
self.descending_diagonal_index_map[-i] = list()
self.score = 0
for queen in working_solution.queen_list:
self.insert(queen)
def beforeEntityAdded(self, entity: any):
pass
def afterEntityAdded(self, entity: any):
self.insert(entity)
def beforeVariableChanged(self, entity: any, variableName: str):
self.retract(entity)
def afterVariableChanged(self, entity: any, variableName: str):
self.insert(entity)
def beforeEntityRemoved(self, entity: any):
self.retract(entity)
def afterEntityRemoved(self, entity: any):
pass
def insert(self, queen: Queen):
row = queen.row
if row is not None:
row_index = queen.row
row_index_list = self.row_index_map[row_index]
self.score -= len(row_index_list)
row_index_list.append(queen)
ascending_diagonal_index_list = self.ascending_diagonal_index_map[queen.getAscendingDiagonalIndex()]
self.score -= len(ascending_diagonal_index_list)
ascending_diagonal_index_list.append(queen)
descending_diagonal_index_list = self.descending_diagonal_index_map[queen.getDescendingDiagonalIndex()]
self.score -= len(descending_diagonal_index_list)
descending_diagonal_index_list.append(queen)
def retract(self, queen: Queen):
row = queen.row
if row is not None:
row_index = queen.row
row_index_list = self.row_index_map[row_index]
row_index_list.remove(queen)
self.score += len(row_index_list)
ascending_diagonal_index_list = self.ascending_diagonal_index_map[queen.getAscendingDiagonalIndex()]
ascending_diagonal_index_list.remove(queen)
self.score += len(ascending_diagonal_index_list)
descending_diagonal_index_list = self.descending_diagonal_index_map[queen.getDescendingDiagonalIndex()]
descending_diagonal_index_list.remove(queen)
self.score += len(descending_diagonal_index_list)
def calculateScore(self) -> optapy.score.SimpleScore:
return optapy.score.SimpleScore.of(self.score)
solver_config = optapy.config.solver.SolverConfig()
termination_config = optapy.config.solver.termination.TerminationConfig()
termination_config.setBestScoreLimit('0')
solver_config.withSolutionClass(optapy.get_class(Solution)) \
.withEntityClasses(optapy.get_class(Queen)) \
.withScoreDirectorFactory(optapy.config.score.director.ScoreDirectorFactoryConfig() \
.withIncrementalScoreCalculatorClass(optapy.get_class(IncrementalScoreCalculator))) \
.withTerminationConfig(termination_config)
If by weights you mean ConstraintConfiguration
(which allows you to define custom constraint weights per problem), that is not exposed via OptaPy yet. If you mean how to make a constraint weight more/less, either change the second parameter to penalize/reward
(if constant), or add a third parameter that computes the constraint multiplier (which the second parameter will be multiplied by), like so:
def undesired_day_for_employee(constraint_factory: ConstraintFactory):
return constraint_factory.forEach(shift_class) \
.join(availability_class, [Joiners.equal(lambda shift: shift.employee,
lambda availability: availability.employee),
Joiners.equal(lambda shift: shift.start.date(),
lambda availability: availability.date)
]) \
.filter(lambda shift, availability: availability.availability_type == AvailabilityType.UNDESIRED) \
.penalize('Undesired day for employee', HardSoftScore.ofSoft(2),
lambda shift, availability: get_shift_duration_in_minutes(shift))
(this constraint penalizes by 2 soft for every minute an employee works on an UNDESIRED day)