pythonor-toolsconstraint-programmingcp-sat

Introducing penalty cost for not using a certain machine to ORTools Job Shop Problem


I have a slightly updated jobshop problem code in which I added deadlines and idle_times (hope they're implemented correctly) thanks to the help of great users from this page. Now I'm looking to further update the code and also add another feature. Let's say it costs a lot to turn on a machine and give it a job so I need to introduce a penalty cost so that the solver tries to use that machine without having a pause time or idle time between orders since it would cost a lot to turn that machine back on again. Or at least try to have as little of an idle time as possible.

Any ideas how I could implement a feature like this? I was thinking of adding it either as a soft constraint or hard constraint but it only needs to be on certain machines. Let's say an oven that takes time and energy to turn it on again.

My code:

import collections

# Import Python wrapper for or-tools CP-SAT solver.
from ortools.sat.python import cp_model


def MinimalJobshopSat():
"""Minimal jobshop problem."""
# Create the model.
model = cp_model.CpModel()

jobs_data = [  # task = (machine_id, processing_time, deadlines, idle_times).
    [(0, 3, 10,1), (1, 2, 10,0), (2, 2, 10,0)],  # Job0
    [(0, 2, 15,1), (2, 1, 15,0), (1, 4, 15,1)],  # Job1
    [(1, 4, 15,0), (2, 3, 15,1)]  # Job2
]


#counts the number of machines (3 in this case)
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)

# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)

# Named tuple to store information about created variables.
task_type = collections.namedtuple('task_type', 'start end deadline interval')

# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple('assigned_task_type',
                                            'start job index duration')

# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)

for job_id, job in enumerate(jobs_data):
    for task_id, task in enumerate(job):
        machine = task[0]
        duration = task[1]
        deadline = task[2]
        idle_time = task[3]
        suffix = '_%i_%i' % (job_id, task_id)
        
        start_var = model.NewIntVar(0, horizon, 'start' + suffix)
        
        end_var = model.NewIntVar(0, deadline, 'end' + suffix)
    
        interval_var = model.NewIntervalVar(start_var, duration, end_var,
                                            'interval' + suffix)
        deadline_var = model.NewIntVar(deadline, deadline,
                                            'deadline' + suffix)
       
        all_tasks[job_id, task_id] = task_type(
            start=start_var, end=end_var, deadline=deadline_var, interval=interval_var)
        
        machine_to_intervals[machine].append(interval_var)
        
# Create and add disjunctive constraints.
for machine in all_machines:
    model.AddNoOverlap(machine_to_intervals[machine])

# Precedences inside a job.
for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.Add(all_tasks[job_id, task_id +
                            1].start >= all_tasks[job_id, task_id].end + idle_time)

for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.Add(all_tasks[job_id, task_id].end <= all_tasks[job_id, task_id].deadline)
        
    
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [
    all_tasks[job_id, len(job) - 1].end
    for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)

# Solve model.
solver = cp_model.CpSolver()
status = solver.Solve(model)

if status == cp_model.OPTIMAL:
    # Create one list of assigned tasks per machine.
    assigned_jobs = collections.defaultdict(list)
    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            assigned_jobs[machine].append(
                assigned_task_type(
                    start=solver.Value(all_tasks[job_id, task_id].start),
                    job=job_id,
                    index=task_id,
                    duration=task[1]))

    # Create per machine output lines.
    output = ''
    for machine in all_machines:
        # Sort by starting time.
        assigned_jobs[machine].sort()
        sol_line_tasks = 'Machine ' + str(machine) + ': '
        sol_line = '           '

        for assigned_task in assigned_jobs[machine]:
            name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
            # Add spaces to output to align columns.
            sol_line_tasks += '%-10s' % name

            start = assigned_task.start
            duration = assigned_task.duration
            sol_tmp = '[%i,%i]' % (start, start + duration)
            # Add spaces to output to align columns.
            sol_line += '%-10s' % sol_tmp

        sol_line += '\n'
        sol_line_tasks += '\n'
        output += sol_line_tasks
        output += sol_line

    # Finally print the solution found.
    print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
    print(output)


MinimalJobshopSat()

Solution

  • Look at this example

    This will create a set of literals, one for each possible direct successor of a task.

    Now you can use this literal to create the penalty

    # literal is true if task_b is a direct successor of task_b
    penalty = model.NewBoolVar('penalty')
    model.Add(task_b_start > task_a_end).OnlyEnforceIf([literal, penalty])
    model.Add(task_b_start == task_a_end).OnlyEnforceIf([literal, penalty.Not()])
    
    # objective
    model.Minimize(sum(penalties))