The purpose of this code is to model the following cell:Robotic Cell
It has 1 welding robot, 1 operator loading/unloading parts, and 2 fixtures that the parts are worked on within. The part is worked on in Fixture01, then moved to fixture02 for more work, then stored away. When this thing is working correctly, the operator should be working on one fixture, while the robot works on the other.
I have made a code that seems to model this sequence correctly, but the issue is that the operator resource is not released to the next part's process until the entire process is complete. If you run the code, you will see that there is a space in time after he has loaded into fixture 02 and the robot is working on that fixture02 that he waits rather than getting started on loading up the first fixture with the next part. fixture 01 is released and so is the operator during the robot process so if the resources are available to do a scheduled process, why is it not advancing?
Is it that I call for both resources concurrently? Maybe I need to ask for the fixture and once I have it ask for a robot/operator rather than asking for a fixture & operator/robot before proceeding in the process.
I'm still trying things but I wanted to ask the community for any insight they might have into this issue. It would be greatly appreciated!
Here is what I have so far, still needs a lot of work to get where I need to go but if I can crack this sequencing issue I think I can get there!
import simpy
import sys
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
# Number of parts to produce
sim_cycles = 5
# Initialize the data logging dictionary
log_data = {
'Part': [],
'Process': [],
'Start_Time': [],
'End_Time': []
}
class Robot:
def __init__(self, env, name):
self.env = env
self.name = name
self.resource = simpy.Resource(env, capacity=1)
def mig_weld(self, part, length, seam_qty, option, fix):
# Request resources
with self.resource.request() as robot_req, fix.ownership.request() as fix_own_req:
yield robot_req & fix_own_req # Request the robot and fixture ownership
# Simulate welding based on selected option
if option == 1:
ipm = 25
elif option == 1.5:
ipm = 30
elif option == 2:
ipm = 35
elif option == 3:
ipm = 60
else:
sys.exit("Invalid Mig Option Encountered, Stopping Simulation")
# Convert inches per minute to millimeters per second
mmps = ipm * 25.4 / 60
start_time = self.env.now
print(f"{self.name} starts welding {part} at {start_time}")
# Calculate welding time
weld_time = length / mmps + 2.2 * seam_qty
yield self.env.timeout(2) # robot in
yield self.env.timeout(weld_time)
yield self.env.timeout(2) # robot out
end_time = self.env.now
print(f"{self.name} finishes welding {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: MIG Weld in {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# Resources are automatically released using 'with' context
def spot_weld(self, part, qty, fix):
# Request resources
with self.resource.request() as robot_req, fix.ownership.request() as fix_own_req:
yield robot_req & fix_own_req # Request the robot and fixture ownership
rate = 2.75 # spots/sec
start_time = self.env.now
print(f"{self.name} starts spot welding {part} in {fix.name} at {start_time}")
# Calculate welding time
weld_time = qty * rate
yield self.env.timeout(2) # robot in
yield self.env.timeout(weld_time)
yield self.env.timeout(2) # robot out
end_time = self.env.now
print(f"{self.name} finishes spot welding {part} in {fix.name} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Spot Weld in {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# Resources are automatically released using 'with' context
class Operator:
def __init__(self, env, name):
self.env = env
self.name = name
self.resource = simpy.Resource(env, capacity=1)
def load_part(self, part, load_time, fix):
#yield env.process(fix.unclamp(part))
# Request resources
with self.resource.request() as op_req, fix.ownership.request() as own_req:
yield op_req & own_req
#env.process(self.walk_in(part,fix))
start_time = self.env.now
print(f"{self.name} starts loading {part} into {fix.name} at {start_time}")
yield self.env.timeout(load_time) # Operator loads for some time
end_time = self.env.now
print(f"{self.name} finishes loading {part} into {fix.name} at {end_time}")
# Resources are automatically released
#yield env.process(fix.clamp(part))
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Load Part into {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# After loading, request fix.part to hold until unloading
fix.part_request = fix.part.request()
yield fix.part_request # Hold the part resource until unloading
def unload_part(self, part, unload_time, fix):
#yield env.process(fix.unclamp(part))
with self.resource.request() as op_req, fix.ownership.request() as own_req:
yield op_req & own_req
#yield env.process(fix.unclamp(part)) & env.process(self.walk_in(part,fix))#wait for fixture to unclamp
start_time = self.env.now
print(f"{self.name} starts unloading {part} from {fix.name} at {start_time}")
yield self.env.timeout(unload_time)
end_time = self.env.now
print(f"{self.name} finishes unloading {part} from {fix.name} at {end_time}")
#yield env.process(self.walk_out(part,fix))
# Resources are automatically released
# Release the part resource to indicate the fixture is empty
fix.part.release(fix.part_request)
fix.part_request = None # Clear the stored request
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Unload Part from {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def put_away_part(self, part):
with self.resource.request() as op_req:
yield op_req # Request the operator resource
start_time = self.env.now
print(f"{self.name} starts putting away {part} at {start_time}")
yield self.env.timeout(6) # Fixed put-away time
end_time = self.env.now
print(f"{self.name} finishes putting away {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Put Away Part')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# Resource is automatically released
def walk_in(self, part, fix):
start_time = self.env.now
print(f"{self.name} walks into {fix.name} at {start_time}")
yield self.env.timeout(2) # Fixed to walk into cell
end_time = self.env.now
print(f"{self.name} has arrived at {fix.name} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: walks into {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def walk_out(self, part, fix):
start_time = self.env.now
print(f"{self.name} walks out of {fix.name} at {start_time}")
yield self.env.timeout(2) # Fixed to walk into cell
end_time = self.env.now
print(f"{self.name} has left {fix.name} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: walks out of {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
class Fixture:
def __init__(self, env, name):
self.env = env
self.name = name
self.part = simpy.Resource(env, capacity=1)
self.part_request = None # Store the part request
self.ownership = simpy.Resource(env, capacity=1)
def clamp(self, part):
start_time = self.env.now
print(f"{self.name} starts clamping {part} at {start_time}")
yield self.env.timeout(4) # Fixed clamp time
end_time = self.env.now
print(f"{self.name} finishes clamping {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Clamp')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def unclamp(self, part):
start_time = self.env.now
print(f"{self.name} starts unclamping {part} at {start_time}")
yield self.env.timeout(4) # Fixed unclamp time
end_time = self.env.now
print(f"{self.name} finishes unclamping {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Unclamp')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def production_loop(env, robot, operator, fixture_01, fixture_02, part):
for part_num in range(1, sim_cycles + 1):
part = f'Part-{part_num:02}'
# Operator loads part into Fixture 1
yield env.process(operator.load_part(part, 41, fixture_01))
# Robot spot welds on Fixture 1
yield env.process(robot.spot_weld(part, qty = 35, fix=fixture_01))
# Operator unloads part from Fixture 1
yield env.process(operator.unload_part(part, 51, fixture_01))
# Operator loads part into Fixture 2
yield env.process(operator.load_part(part, 38, fixture_02))
# Robot spot welds on Fixture 2
yield env.process(robot.spot_weld(part, qty = 65, fix=fixture_02))
# Operator unloads part from Fixture 2
yield env.process(operator.unload_part(part, 58, fixture_02))
# Operator puts away part
yield env.process(operator.put_away_part(part))
# Setup simulation environment
env = simpy.Environment()
robot = Robot(env, 'Robot-1')
operator = Operator(env, 'Operator-1')
fixture_01 = Fixture(env, 'Fixture-01')
fixture_02 = Fixture(env, 'Fixture-02')
# Start the production loop as a process for each part
env.process(production_loop(env, robot, operator, fixture_01, fixture_02, sim_cycles))
# Run simulation
env.run()
# Convert log data into a DataFrame
df = pd.DataFrame(log_data)
# First Gantt chart: x-axis is time, y-axis is parts, colored by process
plt.figure(figsize=(18, 6))
sns.set_style("whitegrid")
parts = df['Part'].unique()
part_palette = sns.color_palette("Accent", len(parts))
part_color_dict = dict(zip(parts, part_palette))
# Sort processes alphabetically
df['Process'] = df['Process'].astype('category')
df['Process'] = df['Process'].cat.reorder_categories(sorted(df['Process'].unique()))
df.sort_values(['Process', 'Start_Time'], inplace=True)
# Plot the first Gantt chart
for part_name, part in df.groupby('Part'):
color = part_color_dict.get(part_name, 'grey')
for _, row in part.iterrows():
plt.barh(
y=row['Process'],
width=row['End_Time'] - row['Start_Time'],
left=row['Start_Time'],
edgecolor='black',
color=color,
label=part_name if (row['Process'] == 'Operator-1: Load Part into Fixture-01') else ""
)
# Remove duplicate labels in the legend
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(by_label.values(), by_label.keys(), title='Part', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.xlabel('Time')
plt.ylabel('Process')
plt.title('Process Timeline (Colored by Part Cycle)')
plt.tight_layout()
plt.show()
You got a good start, but I think I see a couple of issues.
Your loop creates a part and then does each process step, then after all the processing is done, you loop and create the next part. Your loop only process one part at a time
You need to make a process that has just the processing steps. In your loop you need to create a part and for each part an instance of the process but when you start the process do not use a yield, just drop and go to the next part.
The second issue is once a part gets a fixture, it should keep it through all the processes related to that fixture. It should not be seizing the fixture in each task. In the main process first seize the fixture and nest the other fixture related processes in the with statement.
Here is my quick take. I may not of solved all the issues, but I think the chart looks better.
import simpy
import sys
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
# Number of parts to produce
sim_cycles = 5
# Initialize the data logging dictionary
log_data = {
'Part': [],
'Process': [],
'Start_Time': [],
'End_Time': []
}
class Robot:
def __init__(self, env, name):
self.env = env
self.name = name
self.resource = simpy.Resource(env, capacity=1)
def mig_weld(self, part, length, seam_qty, option, fix):
# Request resources
with self.resource.request() as robot_req:
yield robot_req # Request the robot and fixture ownership
# Simulate welding based on selected option
if option == 1:
ipm = 25
elif option == 1.5:
ipm = 30
elif option == 2:
ipm = 35
elif option == 3:
ipm = 60
else:
sys.exit("Invalid Mig Option Encountered, Stopping Simulation")
# Convert inches per minute to millimeters per second
mmps = ipm * 25.4 / 60
start_time = self.env.now
print(f"{self.name} starts welding {part} at {start_time}")
# Calculate welding time
weld_time = length / mmps + 2.2 * seam_qty
yield self.env.timeout(2) # robot in
yield self.env.timeout(weld_time)
yield self.env.timeout(2) # robot out
end_time = self.env.now
print(f"{self.name} finishes welding {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: MIG Weld in {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# Resources are automatically released using 'with' context
def spot_weld(self, part, qty, fix):
# Request resources
with self.resource.request() as robot_req:
yield robot_req # Request the robot and fixture ownership
rate = 2.75 # spots/sec
start_time = self.env.now
print(f"{self.name} starts spot welding {part} in {fix.name} at {start_time}")
# Calculate welding time
weld_time = qty * rate
yield self.env.timeout(2) # robot in
yield self.env.timeout(weld_time)
yield self.env.timeout(2) # robot out
end_time = self.env.now
print(f"{self.name} finishes spot welding {part} in {fix.name} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Spot Weld in {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# Resources are automatically released using 'with' context
class Operator:
def __init__(self, env, name):
self.env = env
self.name = name
self.resource = simpy.Resource(env, capacity=1)
def load_part(self, part, load_time, fix):
#yield env.process(fix.unclamp(part))
# Request resources
with self.resource.request() as op_req:
yield op_req
#env.process(self.walk_in(part,fix))
start_time = self.env.now
print(f"{self.name} starts loading {part} into {fix.name} at {start_time}")
yield self.env.timeout(load_time) # Operator loads for some time
end_time = self.env.now
print(f"{self.name} finishes loading {part} into {fix.name} at {end_time}")
# Resources are automatically released
#yield env.process(fix.clamp(part))
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Load Part into {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# # After loading, request fix.part to hold until unloading
# fix.part_request = fix.part.request()
# yield fix.part_request # Hold the part resource until unloading
def unload_part(self, part, unload_time, fix):
#yield env.process(fix.unclamp(part))
with self.resource.request() as op_req:
yield op_req
#yield env.process(fix.unclamp(part)) & env.process(self.walk_in(part,fix))#wait for fixture to unclamp
start_time = self.env.now
print(f"{self.name} starts unloading {part} from {fix.name} at {start_time}")
yield self.env.timeout(unload_time)
end_time = self.env.now
print(f"{self.name} finishes unloading {part} from {fix.name} at {end_time}")
#yield env.process(self.walk_out(part,fix))
# Resources are automatically released
# # Release the part resource to indicate the fixture is empty
# fix.part.release(fix.part_request)
# fix.part_request = None # Clear the stored request
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Unload Part from {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def put_away_part(self, part):
with self.resource.request() as op_req:
yield op_req # Request the operator resource
start_time = self.env.now
print(f"{self.name} starts putting away {part} at {start_time}")
yield self.env.timeout(6) # Fixed put-away time
end_time = self.env.now
print(f"{self.name} finishes putting away {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Put Away Part')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
# Resource is automatically released
def walk_in(self, part, fix):
start_time = self.env.now
print(f"{self.name} walks into {fix.name} at {start_time}")
yield self.env.timeout(2) # Fixed to walk into cell
end_time = self.env.now
print(f"{self.name} has arrived at {fix.name} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: walks into {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def walk_out(self, part, fix):
start_time = self.env.now
print(f"{self.name} walks out of {fix.name} at {start_time}")
yield self.env.timeout(2) # Fixed to walk into cell
end_time = self.env.now
print(f"{self.name} has left {fix.name} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: walks out of {fix.name}')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
class Fixture:
def __init__(self, env, name):
self.env = env
self.name = name
#self.part = simpy.Resource(env, capacity=1)
#self.part_request = None # Store the part request
self.ownership = simpy.Resource(env, capacity=1)
def clamp(self, part):
start_time = self.env.now
print(f"{self.name} starts clamping {part} at {start_time}")
yield self.env.timeout(4) # Fixed clamp time
end_time = self.env.now
print(f"{self.name} finishes clamping {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Clamp')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def unclamp(self, part):
start_time = self.env.now
print(f"{self.name} starts unclamping {part} at {start_time}")
yield self.env.timeout(4) # Fixed unclamp time
end_time = self.env.now
print(f"{self.name} finishes unclamping {part} at {end_time}")
# Log the event
log_data['Part'].append(part)
log_data['Process'].append(f'{self.name}: Unclamp')
log_data['Start_Time'].append(start_time)
log_data['End_Time'].append(end_time)
def production_loop(env, robot, operator, fixture_01, fixture_02, sim_cycles):
for part_num in range(1, sim_cycles + 1):
part = f'Part-{part_num:02}'
# create a process per part so they can fight for resoures
# resource have queues to manage all the requests
# note there is no yield here so all the proecess get created at same time
env.process(production_proc(env, robot, operator, fixture_01, fixture_02, part))
def production_proc(env, robot, operator, fixture_01, fixture_02, part):
# once a part gets a fixture, keep it until all the fixture processing is done
with fixture_01.ownership.request() as own_req:
yield own_req
# Operator loads part into Fixture 1
yield env.process(operator.load_part(part, 41, fixture_01))
# Robot spot welds on Fixture 1
yield env.process(robot.spot_weld(part, qty = 35, fix=fixture_01))
# Operator unloads part from Fixture 1
yield env.process(operator.unload_part(part, 51, fixture_01))
# keep fixture until all the fixture related processing is done
with fixture_02.ownership.request() as own_req:
yield own_req
# Operator loads part into Fixture 2
yield env.process(operator.load_part(part, 38, fixture_02))
# Robot spot welds on Fixture 2
yield env.process(robot.spot_weld(part, qty = 65, fix=fixture_02))
# Operator unloads part from Fixture 2
yield env.process(operator.unload_part(part, 58, fixture_02))
# Operator puts away part
yield env.process(operator.put_away_part(part))
# Setup simulation environment
env = simpy.Environment()
robot = Robot(env, 'Robot-1')
operator = Operator(env, 'Operator-1')
fixture_01 = Fixture(env, 'Fixture-01')
fixture_02 = Fixture(env, 'Fixture-02')
# Start the production loop as a process for each part
production_loop(env, robot, operator, fixture_01, fixture_02, sim_cycles)
# Run simulation
env.run()
# Convert log data into a DataFrame
df = pd.DataFrame(log_data)
# First Gantt chart: x-axis is time, y-axis is parts, colored by process
plt.figure(figsize=(18, 6))
sns.set_style("whitegrid")
parts = df['Part'].unique()
part_palette = sns.color_palette("Accent", len(parts))
part_color_dict = dict(zip(parts, part_palette))
# Sort processes alphabetically
df['Process'] = df['Process'].astype('category')
df['Process'] = df['Process'].cat.reorder_categories(sorted(df['Process'].unique()))
df.sort_values(['Process', 'Start_Time'], inplace=True)
# Plot the first Gantt chart
for part_name, part in df.groupby('Part'):
color = part_color_dict.get(part_name, 'grey')
for _, row in part.iterrows():
plt.barh(
y=row['Process'],
width=row['End_Time'] - row['Start_Time'],
left=row['Start_Time'],
edgecolor='black',
color=color,
label=part_name if (row['Process'] == 'Operator-1: Load Part into Fixture-01') else ""
)
# Remove duplicate labels in the legend
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(by_label.values(), by_label.keys(), title='Part', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.xlabel('Time')
plt.ylabel('Process')
plt.title('Process Timeline (Colored by Part Cycle)')
plt.tight_layout()
plt.show()