pythonor-toolsconstraint-programmingcp-sat

How to add idle time to Google's OR-Tools Job Shop problem?


I'm new to Google OR-Tools (and constraint programming in general) and I'm trying to add idle times to the Jobshop example.

The job-shop example I took can be found here: https://developers.google.com/optimization/scheduling/job_shop#entire-program

I made some modifications though as I included a deadline variable. Now the only thing I want to add to the program is an idle variable. For example, a machine that finishes a task has to have a certain idle time before starting on the next task. How could I implement this in the code? My code:

import collections

# Import Python wrapper for or-tools CP-SAT solver.
from ortools.sat.python import cp_model


def MinimalJobshopSat():
"""Minimal jobshop problem."""
# Create the model.
    model = cp_model.CpModel()

    jobs_data = [  # task = (machine_id, processing_time,deadline).
    [(0, 3, 7), (1, 2, 7), (2, 2, 7)],  # Job0
    [(0, 2, 13), (2, 1, 13), (1, 4, 13)],  # Job1
    [(1, 4, 13), (2, 3, 13)]  # Job2
]


#counts the number of machines (3 in this case)
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)

# Computes horizon dynamically as the sum of all durations.
horizon = sum(task[1] for job in jobs_data for task in job)

# Named tuple to store information about created variables.
task_type = collections.namedtuple('task_type', 'start end deadline interval')

# Named tuple to manipulate solution information.
assigned_task_type = collections.namedtuple('assigned_task_type',
                                            'start job index duration')

# Creates job intervals and add to the corresponding machine lists.
all_tasks = {}
machine_to_intervals = collections.defaultdict(list)


for job_id, job in enumerate(jobs_data):
    for task_id, task in enumerate(job):
        machine = task[0]
        duration = task[1]
        deadline = task[2]
        suffix = '_%i_%i' % (job_id, task_id)
        
        start_var = model.NewIntVar(0, horizon, 'start' + suffix)
        
        end_var = model.NewIntVar(0, deadline, 'end' + suffix)
    
        interval_var = model.NewIntervalVar(start_var, duration, end_var,
                                            'interval' + suffix)
        deadline_var = model.NewIntVar(deadline, deadline,
                                            'deadline' + suffix)
       
        all_tasks[job_id, task_id] = task_type(
            start=start_var, end=end_var, deadline=deadline_var, interval=interval_var)
        
        machine_to_intervals[machine].append(interval_var)
        
# Create and add disjunctive constraints.
for machine in all_machines:
    model.AddNoOverlap(machine_to_intervals[machine])

# Precedences inside a job.
for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.Add(all_tasks[job_id, task_id +
                            1].start >= all_tasks[job_id, task_id].end)

for job_id, job in enumerate(jobs_data):
    for task_id in range(len(job) - 1):
        model.Add(all_tasks[job_id, task_id].end <= all_tasks[job_id, task_id].deadline)
        
    
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(obj_var, [
    all_tasks[job_id, len(job) - 1].end
    for job_id, job in enumerate(jobs_data)
])
model.Minimize(obj_var)

# Solve model.
solver = cp_model.CpSolver()
status = solver.Solve(model)

if status == cp_model.OPTIMAL:
    # Create one list of assigned tasks per machine.
    assigned_jobs = collections.defaultdict(list)
    for job_id, job in enumerate(jobs_data):
        for task_id, task in enumerate(job):
            machine = task[0]
            assigned_jobs[machine].append(
                assigned_task_type(
                    start=solver.Value(all_tasks[job_id, task_id].start),
                    job=job_id,
                    index=task_id,
                    duration=task[1]))

    # Create per machine output lines.
    output = ''
    for machine in all_machines:
        # Sort by starting time.
        assigned_jobs[machine].sort()
        sol_line_tasks = 'Machine ' + str(machine) + ': '
        sol_line = '           '

        for assigned_task in assigned_jobs[machine]:
            name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
            # Add spaces to output to align columns.
            sol_line_tasks += '%-10s' % name

            start = assigned_task.start
            duration = assigned_task.duration
            sol_tmp = '[%i,%i]' % (start, start + duration)
            # Add spaces to output to align columns.
            sol_line += '%-10s' % sol_tmp

        sol_line += '\n'
        sol_line_tasks += '\n'
        output += sol_line_tasks
        output += sol_line

    # Finally print the solution found.
    print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
    print(output)


MinimalJobshopSat()

Solution

  • You can just change this constraint:

    # Precedences inside a job.
    for job_id, job in enumerate(jobs_data):
        for task_id in range(len(job) - 1):
            model.Add(all_tasks[job_id, task_id +
                                1].start >= all_tasks[job_id, task_id].end)
    

    to

    model.Add(all_tasks[job_id, task_id +
                                1].start >= all_tasks[job_id, task_id].end + idle_time)
    

    For a more complex example (distance between tasks) see: https://github.com/google/or-tools/blob/stable/examples/python/jobshop_ft06_distance_sat.py