pythonsimpyevent-simulation

Modelling a Robotic Factory Cell with Simpy


The purpose of this code is to model the following cell:Robotic Cell

It has 1 welding robot, 1 operator loading/unloading parts, and 2 fixtures that the parts are worked on within. The part is worked on in Fixture01, then moved to fixture02 for more work, then stored away. When this thing is working correctly, the operator should be working on one fixture, while the robot works on the other.

I have made a code that seems to model this sequence correctly, but the issue is that the operator resource is not released to the next part's process until the entire process is complete. If you run the code, you will see that there is a space in time after he has loaded into fixture 02 and the robot is working on that fixture02 that he waits rather than getting started on loading up the first fixture with the next part. fixture 01 is released and so is the operator during the robot process so if the resources are available to do a scheduled process, why is it not advancing?

Is it that I call for both resources concurrently? Maybe I need to ask for the fixture and once I have it ask for a robot/operator rather than asking for a fixture & operator/robot before proceeding in the process.

I'm still trying things but I wanted to ask the community for any insight they might have into this issue. It would be greatly appreciated!

Here is what I have so far, still needs a lot of work to get where I need to go but if I can crack this sequencing issue I think I can get there!

import simpy
import sys
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Number of parts to produce
sim_cycles = 5

# Initialize the data logging dictionary
log_data = {
    'Part': [],
    'Process': [],
    'Start_Time': [],
    'End_Time': []
}

class Robot:
    def __init__(self, env, name):
        self.env = env
        self.name = name
        self.resource = simpy.Resource(env, capacity=1)

    def mig_weld(self, part, length, seam_qty, option, fix):
        # Request resources
        with self.resource.request() as robot_req, fix.ownership.request() as fix_own_req:
            yield robot_req & fix_own_req  # Request the robot and fixture ownership

            # Simulate welding based on selected option
            if option == 1:
                ipm = 25
            elif option == 1.5:
                ipm = 30
            elif option == 2:
                ipm = 35
            elif option == 3:
                ipm = 60
            else:
                sys.exit("Invalid Mig Option Encountered, Stopping Simulation")
            
            # Convert inches per minute to millimeters per second
            mmps = ipm * 25.4 / 60
            start_time = self.env.now
            print(f"{self.name} starts welding {part} at {start_time}")
            
            # Calculate welding time
            weld_time = length / mmps + 2.2 * seam_qty
            yield self.env.timeout(2) # robot in
            yield self.env.timeout(weld_time)
            yield self.env.timeout(2) # robot out
            end_time = self.env.now
            print(f"{self.name} finishes welding {part} at {end_time}")
            
            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: MIG Weld in {fix.name}')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)
            # Resources are automatically released using 'with' context

    def spot_weld(self, part, qty, fix):
        # Request resources
        with self.resource.request() as robot_req, fix.ownership.request() as fix_own_req:
            yield robot_req & fix_own_req  # Request the robot and fixture ownership
            rate = 2.75  # spots/sec
            start_time = self.env.now
            print(f"{self.name} starts spot welding {part} in {fix.name} at {start_time}")
            
            # Calculate welding time
            weld_time = qty * rate
            yield self.env.timeout(2) # robot in
            yield self.env.timeout(weld_time)
            yield self.env.timeout(2) # robot out
            end_time = self.env.now
            print(f"{self.name} finishes spot welding {part} in {fix.name} at {end_time}")
            
            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: Spot Weld in {fix.name}')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)
            # Resources are automatically released using 'with' context

class Operator:
    def __init__(self, env, name):
        self.env = env
        self.name = name
        self.resource = simpy.Resource(env, capacity=1)

    def load_part(self, part, load_time, fix):
        #yield env.process(fix.unclamp(part))
        # Request resources
        with self.resource.request() as op_req, fix.ownership.request() as own_req:
            yield op_req & own_req
            #env.process(self.walk_in(part,fix))
            start_time = self.env.now
            print(f"{self.name} starts loading {part} into {fix.name} at {start_time}")
            yield self.env.timeout(load_time)  # Operator loads for some time
            end_time = self.env.now
            print(f"{self.name} finishes loading {part} into {fix.name} at {end_time}")
            # Resources are automatically released
            #yield env.process(fix.clamp(part))

            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: Load Part into {fix.name}')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)

        # After loading, request fix.part to hold until unloading
        fix.part_request = fix.part.request()
        yield fix.part_request  # Hold the part resource until unloading

    def unload_part(self, part, unload_time, fix):
        #yield env.process(fix.unclamp(part))
        with self.resource.request() as op_req, fix.ownership.request() as own_req:
            yield op_req & own_req
            #yield env.process(fix.unclamp(part)) & env.process(self.walk_in(part,fix))#wait for fixture to unclamp
            start_time = self.env.now
            print(f"{self.name} starts unloading {part} from {fix.name} at {start_time}")
            yield self.env.timeout(unload_time)
            end_time = self.env.now
            print(f"{self.name} finishes unloading {part} from {fix.name} at {end_time}")
            #yield env.process(self.walk_out(part,fix))
            
            # Resources are automatically released

            # Release the part resource to indicate the fixture is empty
            fix.part.release(fix.part_request)
            fix.part_request = None  # Clear the stored request

            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: Unload Part from {fix.name}')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)
        

    def put_away_part(self, part):
        with self.resource.request() as op_req:
            yield op_req  # Request the operator resource
            start_time = self.env.now
            print(f"{self.name} starts putting away {part} at {start_time}")
            yield self.env.timeout(6)  # Fixed put-away time
            end_time = self.env.now
            print(f"{self.name} finishes putting away {part} at {end_time}")

            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: Put Away Part')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)
            # Resource is automatically released
            
    def walk_in(self, part, fix):
            start_time = self.env.now
            print(f"{self.name} walks into {fix.name} at {start_time}")
            yield self.env.timeout(2)  # Fixed to walk into cell
            end_time = self.env.now
            print(f"{self.name} has arrived at {fix.name} at {end_time}")
            
            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: walks into {fix.name}')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)

            
            
    def walk_out(self, part, fix):
            start_time = self.env.now
            print(f"{self.name} walks out of {fix.name} at {start_time}")
            yield self.env.timeout(2)  # Fixed to walk into cell
            end_time = self.env.now
            print(f"{self.name} has left {fix.name} at {end_time}")
            
            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: walks out of {fix.name}')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)

            

class Fixture:
    def __init__(self, env, name):
        self.env = env
        self.name = name
        self.part = simpy.Resource(env, capacity=1)
        self.part_request = None  # Store the part request
        self.ownership = simpy.Resource(env, capacity=1)

    def clamp(self, part):
        start_time = self.env.now
        print(f"{self.name} starts clamping {part} at {start_time}")
        yield self.env.timeout(4)  # Fixed clamp time
        end_time = self.env.now
        print(f"{self.name} finishes clamping {part} at {end_time}")

        # Log the event
        log_data['Part'].append(part)
        log_data['Process'].append(f'{self.name}: Clamp')
        log_data['Start_Time'].append(start_time)
        log_data['End_Time'].append(end_time)

    def unclamp(self, part):
        start_time = self.env.now
        print(f"{self.name} starts unclamping {part} at {start_time}")
        yield self.env.timeout(4)  # Fixed unclamp time
        end_time = self.env.now
        print(f"{self.name} finishes unclamping {part} at {end_time}")

        # Log the event
        log_data['Part'].append(part)
        log_data['Process'].append(f'{self.name}: Unclamp')
        log_data['Start_Time'].append(start_time)
        log_data['End_Time'].append(end_time)
        

def production_loop(env, robot, operator, fixture_01, fixture_02, part):
    
    for part_num in range(1, sim_cycles + 1):
        part = f'Part-{part_num:02}'
               
        # Operator loads part into Fixture 1
        yield env.process(operator.load_part(part, 41, fixture_01))

        # Robot spot welds on Fixture 1
        yield env.process(robot.spot_weld(part, qty = 35, fix=fixture_01))

        # Operator unloads part from Fixture 1
        yield env.process(operator.unload_part(part, 51, fixture_01))

        # Operator loads part into Fixture 2
        yield env.process(operator.load_part(part, 38, fixture_02))

        # Robot spot welds on Fixture 2
        yield env.process(robot.spot_weld(part, qty = 65, fix=fixture_02))

        # Operator unloads part from Fixture 2
        yield env.process(operator.unload_part(part, 58, fixture_02))

        # Operator puts away part
        yield env.process(operator.put_away_part(part))
        
    

# Setup simulation environment
env = simpy.Environment()
robot = Robot(env, 'Robot-1')
operator = Operator(env, 'Operator-1')
fixture_01 = Fixture(env, 'Fixture-01')
fixture_02 = Fixture(env, 'Fixture-02')


# Start the production loop as a process for each part
env.process(production_loop(env, robot, operator, fixture_01, fixture_02, sim_cycles))

# Run simulation
env.run()

# Convert log data into a DataFrame
df = pd.DataFrame(log_data)

# First Gantt chart: x-axis is time, y-axis is parts, colored by process
plt.figure(figsize=(18, 6))
sns.set_style("whitegrid")

parts = df['Part'].unique()
part_palette = sns.color_palette("Accent", len(parts))
part_color_dict = dict(zip(parts, part_palette))

# Sort processes alphabetically
df['Process'] = df['Process'].astype('category')
df['Process'] = df['Process'].cat.reorder_categories(sorted(df['Process'].unique()))
df.sort_values(['Process', 'Start_Time'], inplace=True)

# Plot the first Gantt chart
for part_name, part in df.groupby('Part'):
    color = part_color_dict.get(part_name, 'grey')
    for _, row in part.iterrows():
        plt.barh(
            y=row['Process'],
            width=row['End_Time'] - row['Start_Time'],
            left=row['Start_Time'],
            edgecolor='black',
            color=color,
            label=part_name if (row['Process'] == 'Operator-1: Load Part into Fixture-01') else ""
        )

# Remove duplicate labels in the legend
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(by_label.values(), by_label.keys(), title='Part', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.xlabel('Time')
plt.ylabel('Process')
plt.title('Process Timeline (Colored by Part Cycle)')
plt.tight_layout()
plt.show()


Solution

  • You got a good start, but I think I see a couple of issues.

    Your loop creates a part and then does each process step, then after all the processing is done, you loop and create the next part. Your loop only process one part at a time

    You need to make a process that has just the processing steps. In your loop you need to create a part and for each part an instance of the process but when you start the process do not use a yield, just drop and go to the next part.

    The second issue is once a part gets a fixture, it should keep it through all the processes related to that fixture. It should not be seizing the fixture in each task. In the main process first seize the fixture and nest the other fixture related processes in the with statement.

    Here is my quick take. I may not of solved all the issues, but I think the chart looks better.

    import simpy
    import sys
    import pandas as pd
    import seaborn as sns
    import matplotlib.pyplot as plt
    
    # Number of parts to produce
    sim_cycles = 5
    
    # Initialize the data logging dictionary
    log_data = {
        'Part': [],
        'Process': [],
        'Start_Time': [],
        'End_Time': []
    }
    
    class Robot:
        def __init__(self, env, name):
            self.env = env
            self.name = name
            self.resource = simpy.Resource(env, capacity=1)
    
        def mig_weld(self, part, length, seam_qty, option, fix):
            # Request resources
            with self.resource.request() as robot_req:
                yield robot_req  # Request the robot and fixture ownership
    
                # Simulate welding based on selected option
                if option == 1:
                    ipm = 25
                elif option == 1.5:
                    ipm = 30
                elif option == 2:
                    ipm = 35
                elif option == 3:
                    ipm = 60
                else:
                    sys.exit("Invalid Mig Option Encountered, Stopping Simulation")
                
                # Convert inches per minute to millimeters per second
                mmps = ipm * 25.4 / 60
                start_time = self.env.now
                print(f"{self.name} starts welding {part} at {start_time}")
                
                # Calculate welding time
                weld_time = length / mmps + 2.2 * seam_qty
                yield self.env.timeout(2) # robot in
                yield self.env.timeout(weld_time)
                yield self.env.timeout(2) # robot out
                end_time = self.env.now
                print(f"{self.name} finishes welding {part} at {end_time}")
                
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: MIG Weld in {fix.name}')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
                # Resources are automatically released using 'with' context
    
        def spot_weld(self, part, qty, fix):
            # Request resources
            with self.resource.request() as robot_req:
                yield robot_req  # Request the robot and fixture ownership
                rate = 2.75  # spots/sec
                start_time = self.env.now
                print(f"{self.name} starts spot welding {part} in {fix.name} at {start_time}")
                
                # Calculate welding time
                weld_time = qty * rate
                yield self.env.timeout(2) # robot in
                yield self.env.timeout(weld_time)
                yield self.env.timeout(2) # robot out
                end_time = self.env.now
                print(f"{self.name} finishes spot welding {part} in {fix.name} at {end_time}")
                
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: Spot Weld in {fix.name}')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
                # Resources are automatically released using 'with' context
    
    class Operator:
        def __init__(self, env, name):
            self.env = env
            self.name = name
            self.resource = simpy.Resource(env, capacity=1)
    
        def load_part(self, part, load_time, fix):
            #yield env.process(fix.unclamp(part))
            # Request resources
            with self.resource.request() as op_req:
                yield op_req
                #env.process(self.walk_in(part,fix))
                start_time = self.env.now
                print(f"{self.name} starts loading {part} into {fix.name} at {start_time}")
                yield self.env.timeout(load_time)  # Operator loads for some time
                end_time = self.env.now
                print(f"{self.name} finishes loading {part} into {fix.name} at {end_time}")
                # Resources are automatically released
                #yield env.process(fix.clamp(part))
    
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: Load Part into {fix.name}')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
    
            # # After loading, request fix.part to hold until unloading
            # fix.part_request = fix.part.request()
            # yield fix.part_request  # Hold the part resource until unloading
    
        def unload_part(self, part, unload_time, fix):
            #yield env.process(fix.unclamp(part))
            with self.resource.request() as op_req:
                yield op_req
                #yield env.process(fix.unclamp(part)) & env.process(self.walk_in(part,fix))#wait for fixture to unclamp
                start_time = self.env.now
                print(f"{self.name} starts unloading {part} from {fix.name} at {start_time}")
                yield self.env.timeout(unload_time)
                end_time = self.env.now
                print(f"{self.name} finishes unloading {part} from {fix.name} at {end_time}")
                #yield env.process(self.walk_out(part,fix))
                
                # Resources are automatically released
    
                # # Release the part resource to indicate the fixture is empty
                # fix.part.release(fix.part_request)
                # fix.part_request = None  # Clear the stored request
    
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: Unload Part from {fix.name}')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
            
    
        def put_away_part(self, part):
            with self.resource.request() as op_req:
                yield op_req  # Request the operator resource
                start_time = self.env.now
                print(f"{self.name} starts putting away {part} at {start_time}")
                yield self.env.timeout(6)  # Fixed put-away time
                end_time = self.env.now
                print(f"{self.name} finishes putting away {part} at {end_time}")
    
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: Put Away Part')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
                # Resource is automatically released
                
        def walk_in(self, part, fix):
                start_time = self.env.now
                print(f"{self.name} walks into {fix.name} at {start_time}")
                yield self.env.timeout(2)  # Fixed to walk into cell
                end_time = self.env.now
                print(f"{self.name} has arrived at {fix.name} at {end_time}")
                
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: walks into {fix.name}')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
    
                
                
        def walk_out(self, part, fix):
                start_time = self.env.now
                print(f"{self.name} walks out of {fix.name} at {start_time}")
                yield self.env.timeout(2)  # Fixed to walk into cell
                end_time = self.env.now
                print(f"{self.name} has left {fix.name} at {end_time}")
                
                # Log the event
                log_data['Part'].append(part)
                log_data['Process'].append(f'{self.name}: walks out of {fix.name}')
                log_data['Start_Time'].append(start_time)
                log_data['End_Time'].append(end_time)
    
                
    
    class Fixture:
        def __init__(self, env, name):
            self.env = env
            self.name = name
            #self.part = simpy.Resource(env, capacity=1)
            #self.part_request = None  # Store the part request
            self.ownership = simpy.Resource(env, capacity=1)
    
        def clamp(self, part):
            start_time = self.env.now
            print(f"{self.name} starts clamping {part} at {start_time}")
            yield self.env.timeout(4)  # Fixed clamp time
            end_time = self.env.now
            print(f"{self.name} finishes clamping {part} at {end_time}")
    
            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: Clamp')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)
    
        def unclamp(self, part):
            start_time = self.env.now
            print(f"{self.name} starts unclamping {part} at {start_time}")
            yield self.env.timeout(4)  # Fixed unclamp time
            end_time = self.env.now
            print(f"{self.name} finishes unclamping {part} at {end_time}")
    
            # Log the event
            log_data['Part'].append(part)
            log_data['Process'].append(f'{self.name}: Unclamp')
            log_data['Start_Time'].append(start_time)
            log_data['End_Time'].append(end_time)
            
    
    def production_loop(env, robot, operator, fixture_01, fixture_02, sim_cycles):
        
        for part_num in range(1, sim_cycles + 1):
            part = f'Part-{part_num:02}'
    
            # create a process per part so they can fight for resoures
            # resource have queues to manage all the requests
    
            # note there is no yield here so all the proecess get created at same time
            env.process(production_proc(env, robot, operator, fixture_01, fixture_02, part))
    
    
    
    def production_proc(env, robot, operator, fixture_01, fixture_02, part):
              
        # once a part gets a fixture, keep it until all the fixture processing is done
        with fixture_01.ownership.request() as own_req: 
            yield own_req    
    
            # Operator loads part into Fixture 1
            yield env.process(operator.load_part(part, 41, fixture_01))
    
            # Robot spot welds on Fixture 1
            yield env.process(robot.spot_weld(part, qty = 35, fix=fixture_01))
    
            # Operator unloads part from Fixture 1
            yield env.process(operator.unload_part(part, 51, fixture_01))
    
        # keep fixture until all the fixture related processing is done
        with fixture_02.ownership.request() as own_req:
            yield own_req
    
            # Operator loads part into Fixture 2
            yield env.process(operator.load_part(part, 38, fixture_02))
    
            # Robot spot welds on Fixture 2
            yield env.process(robot.spot_weld(part, qty = 65, fix=fixture_02))
    
            # Operator unloads part from Fixture 2
            yield env.process(operator.unload_part(part, 58, fixture_02))
    
            # Operator puts away part
            yield env.process(operator.put_away_part(part))
    
        
    
    # Setup simulation environment
    env = simpy.Environment()
    robot = Robot(env, 'Robot-1')
    operator = Operator(env, 'Operator-1')
    fixture_01 = Fixture(env, 'Fixture-01')
    fixture_02 = Fixture(env, 'Fixture-02')
    
    
    # Start the production loop as a process for each part
    production_loop(env, robot, operator, fixture_01, fixture_02, sim_cycles)
    
    # Run simulation
    env.run()
    
    # Convert log data into a DataFrame
    df = pd.DataFrame(log_data)
    
    # First Gantt chart: x-axis is time, y-axis is parts, colored by process
    plt.figure(figsize=(18, 6))
    sns.set_style("whitegrid")
    
    parts = df['Part'].unique()
    part_palette = sns.color_palette("Accent", len(parts))
    part_color_dict = dict(zip(parts, part_palette))
    
    # Sort processes alphabetically
    df['Process'] = df['Process'].astype('category')
    df['Process'] = df['Process'].cat.reorder_categories(sorted(df['Process'].unique()))
    df.sort_values(['Process', 'Start_Time'], inplace=True)
    
    # Plot the first Gantt chart
    for part_name, part in df.groupby('Part'):
        color = part_color_dict.get(part_name, 'grey')
        for _, row in part.iterrows():
            plt.barh(
                y=row['Process'],
                width=row['End_Time'] - row['Start_Time'],
                left=row['Start_Time'],
                edgecolor='black',
                color=color,
                label=part_name if (row['Process'] == 'Operator-1: Load Part into Fixture-01') else ""
            )
    
    # Remove duplicate labels in the legend
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys(), title='Part', bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.xlabel('Time')
    plt.ylabel('Process')
    plt.title('Process Timeline (Colored by Part Cycle)')
    plt.tight_layout()
    plt.show()