I have an very expensive to evaluate function that needs to be computed in both the cost and targetpenalty in an optimization problem i am facing. I was wondering if there is a workaround here?
Consider the code below highlighting the issue - this is just a toy example(thus the objective function is meant to be slow in this example)... the real getobjective2_ function is already optimized as far as it goes:
import numpy as np
from typing import List
import mystic
import pandas as pd
from mystic.symbolic import simplify
from mystic.symbolic import generate_constraint, generate_solvers
from mystic.monitors import VerboseMonitor, VerboseLoggingMonitor
from mystic.penalty import linear_inequality
def adstock_geometric(x: List, theta: float):
x_decayed = np.zeros_like(x)
x_decayed[0] = x[0]
for xi in range(1, len(x_decayed)):
x_decayed[xi] = x[xi] + theta * x_decayed[xi - 1]
return x_decayed
def getobjective2_(x):
for _ in range(100_000):
pass
adstocks = {'adstock-channel_1': np.array(0.08223851),
'adstock-channel_2': np.array(0.28487006)}
channels = ['channel_1', 'channel_2']
horizon = 12
coeffs = pd.DataFrame({'channel_2-saturationcoefficients': [0.73651 for _ in range(horizon)],
'timevar-channel_2': [0.27527, 0.14287, 0.12847, 0.02112,
0.03544, 0.05230, 0.21811, 0.27527,
0.14287, 0.12847, 0.02112, 0.03544],
'channel_1-saturationcoefficients': [0.70259 for _ in range(horizon)],
'timevar-channel_1': [0.13827, 0.24760, 0.28849,
0.33613, 0.34027, 0.28988,
0.21095, 0.13827, 0.24760,
0.28849, 0.33613, 0.34027]})
incomingcostchannel_1 = np.array([0.10303531, 0.07283391, 0.14465764, 0.12315725, 0.07384496, 0.07709148,
0.06425854, 0.09207129, 0.04114219, 0.04066119, 0.09510273, 0.06785441,
0.08381945, 0.05025105, 0.0617348, 0.08135422, 0.04006317, 0.0348892,
0.05374626, 0.05904413])
incomingcoostchannel_2 = np.array([0.04199663, 0.05350016, 0.05427114, 0.05519786, 0.05449486, 0.05733955,
0.0572036, 0.05479, 0.06165305, 0.06347637, 0.05837555, 0.05841943,
0.05922741, 0.05826971, 0.05652555, 0.06605174, 0.0603985, 0.0617391,
0.05811569, 0.05972648])
def saturate(cost, channel):
return np.power(cost, coeffs[f'{channel}-saturationcoefficients'].values)
tbret = 0
for idx, channel in enumerate(channels):
try:
adstockchannel = adstocks[f'adstock-{channel}']
except KeyError:
adstockchannel = 0.0
costchannel = x[(idx * horizon): (idx * horizon) + horizon]
if channel == "channel_1":
incomingcost = incomingcostchannel_1
else:
incomingcost = incomingcoostchannel_2
zerosoverhorizon = np.zeros(horizon)
incomingcostoverhorizon = np.append(incomingcost, zerosoverhorizon)
incomingcostoverhorizonadstocked = adstock_geometric(x=incomingcostoverhorizon,
theta=adstockchannel)[-horizon:]
costchannel += incomingcostoverhorizonadstocked
costadstocked = adstock_geometric(x=costchannel,
theta=adstockchannel)
costmediaprocessed = saturate(cost=costadstocked, channel=channel)
costready = costmediaprocessed * coeffs[f'timevar-{channel}']
tbret += sum(costready)
return -tbret
def generatemysticconstraints(horizon, totalbudget, channels, minspendtotalconstraints,
maxjumps, maxcovers, totalbudgetconstraint=False):
xs = [f"x{idx}" for idx in range(horizon * len(channels))]
constraints = ""
if totalbudgetconstraint:
totalbudgetconstraint = f"{totalbudget} >= {'+'.join(xs)}"
constraints += totalbudgetconstraint
breakpoints = {channel: idx * horizon for idx, channel in enumerate(channels)}
for channel, cutoff in breakpoints.items():
fromx = cutoff
tox = cutoff + horizon
minspendchannel = minspendtotalconstraints[channel]
maxjump = maxjumps[channel]
maxcover = maxcovers[channel]
minspendconstrainttotal = f"{'+'.join(xs[fromx: tox])} >= {minspendchannel}"
constraints += '\n'
constraints += minspendconstrainttotal
for idx in range(fromx + 1, tox):
maxjumpconstraint_ = f"({xs[idx - 1]} * {maxjump}) >= {xs[idx]}"
constraints += '\n'
constraints += maxjumpconstraint_
for idx in range(fromx, tox - 1):
maxcoverconstraint_ = f"({xs[idx + 1]} * {maxcover}) >= {xs[idx]}"
constraints += '\n'
constraints += maxcoverconstraint_
return constraints
#
def run(totalbudget, targetvalue, scalefactor, scalefactorresponse,
horizon, minspendtotalconstraints, channels, objective, strategy, maxjumps, maxcovers,
x0, boundsextrapolation):
totalbudget_ = totalbudget / scalefactor
constraints = generatemysticconstraints(horizon=horizon,
totalbudget=totalbudget_,
channels=channels,
minspendtotalconstraints=minspendtotalconstraints,
maxjumps=maxjumps,
maxcovers=maxcovers)
def targetpenalty(x, target):
if strategy == "targetroas":
return target * (sum(x) * scalefactor) - (-objective(x) * scalefactorresponse)
elif strategy == "targetcpa":
return (sum(x) * scalefactor) - target * (-objective(x) * scalefactorresponse)
else:
raise NotImplementedError
def totalbudgetpenalty(x, totalbudget):
return sum(x) - totalbudget
@linear_inequality(condition=totalbudgetpenalty, kwds={'totalbudget': totalbudget_})
@linear_inequality(condition=targetpenalty, kwds={'target': targetvalue})
def penalty(x):
return 0.0
constraintsimplified = simplify(constraints,
all=True)
from mystic.constraints import and_
generatedconstraints = generate_constraint(generate_solvers(constraintsimplified), join=and_)
# stepmon = VerboseMonitor(1)
stepmon = VerboseLoggingMonitor(1, 1)
cost = lambda x: objective(x)
bounds = boundsextrapolation
print(f"starting configuration - totalbudget: {totalbudget}, targetvalue: {targetvalue}")
optimum = mystic.solvers.diffev2(cost,
x0=x0,
constraints=generatedconstraints,
bounds=bounds,
penalty=penalty,
full_output=True,
stepmon=stepmon,
itermon=stepmon,
disp=True,
tightrange=True,
npop=40,
# gtol=200,
# maxfun=5000,
maxiter=1000)
optimalvalueslst = optimum[0]
spend_channel_1 = optimalvalueslst[:horizon]
spend_channel_2 = optimalvalueslst[horizon:]
print(f"Percentage spent channel_1: {sum(spend_channel_1) / (sum(spend_channel_1) + sum(spend_channel_2))}")
print(f"Percentage spent channel_2: {sum(spend_channel_2) / (sum(spend_channel_1) + sum(spend_channel_2))}")
totalspent = np.sum(optimalvalueslst) * scalefactor
sales = -objective(optimalvalueslst.copy()) * scalefactorresponse
estimatedcpa = totalspent / sales
estimatedroas = sales / totalspent
penaltyvalue = targetvalue * sum(optimalvalueslst.copy()) - (-objective(optimalvalueslst.copy()))
if totalspent > totalbudget * 1.02:
# redo optimization
raise ValueError("Spent more than totalbudget -> roas target too low / cpa target to high")
print(f"total spent: {totalspent}")
print(f"estimated cpa: {estimatedcpa}")
print(f"estimated roas: {estimatedroas}")
print(f"penalty value: {penaltyvalue}")
return {'x': optimalvalueslst.copy(),
'sales': sales,
'estimatedroas': estimatedroas,
'totalspent': totalspent,
'totalbudget': totalbudget,
'targetvalue': targetvalue
}
def constraintverifier(solution, horizon, channels,
scalefactor, maxjumps, maxcovers,
minspendtotalconstraints, totalbudget, boundsextrapolation, targetmetric=False):
breakpoints = {channel: idx * horizon for idx, channel in enumerate(channels)}
if (sum(solution.copy()) * scalefactor) > totalbudget:
return False
if not targetmetric and abs((sum(solution.copy()) * scalefactor) - totalbudget) > totalbudget * 0.05:
return False
for channel, cutoff in breakpoints.items():
fromx = cutoff
tox = cutoff + horizon
maxjump = maxjumps[channel]
maxcover = maxcovers[channel]
minspendchannel = minspendtotalconstraints[channel]
solchannel = solution[fromx: tox].copy()
boundschannel = boundsextrapolation[fromx: tox]
if not all(lb <= x <= ub for x, (lb, ub) in zip(solchannel.copy(), boundschannel)):
return False
if sum(solchannel.copy()) * scalefactor < minspendchannel:
return False
lastvalue = solchannel[0]
for sequentvalue in solchannel[1:].copy():
if ((lastvalue * maxjump) * 1.01) < sequentvalue: # multiplication by 1.01 due to rounding errors
return False
if (lastvalue * 1.01) < sequentvalue * (1 / maxcover): # multiplication by 1.01 due to rounding errors
return False
lastvalue = sequentvalue
return True
def getdefaulttestvalues():
scalefactor = 11621.64675797
scalefactorresponse = 118257.501641172
horizon = 12
minspendtotalconstraints = {'channel_2': 0.0,
'channel_1': 0.0}
targetvalue = 3.0 # vs 2.5
channels = ['channel_1', 'channel_2']
objective = getobjective2_
strategy = "targetroas"
maxjumps = {'channel_1': 2.5,
'channel_2': 1.5}
maxcovers = {'channel_1': 2.5,
'channel_2': 1.5}
x0 = [0.017209265103744407, 0.017209265103744407, 0.017209265103744407, 0.017209265103744407,
0.017209265103744407, 0.017209265103744407, 0.017209265103744407, 0.017209265103744407,
0.017209265103744407, 0.017209265103744407, 0.017209265103744407, 0.017209265103744407,
0.017209265103744407, 0.017209265103744407, 0.017209265103744407, 0.017209265103744407,
0.017209265103744407, 0.017209265103744407, 0.017209265103744407, 0.017209265103744407,
0.017209265103744407, 0.017209265103744407, 0.017209265103744407, 0.017209265103744407]
boundsextrapolation = [(0.014964578351082093, 5.602713613312933), (0.014964578351082093, 5.602713613312933),
(0.014964578351082093, 5.602713613312933), (0.014964578351082093, 5.602713613312933),
(0.014964578351082093, 5.602713613312933), (0.014964578351082093, 5.602713613312933),
(0.014964578351082093, 5.602713613312933), (0.014964578351082093, 5.602713613312933),
(0.014964578351082093, 5.602713613312933), (0.014964578351082093, 5.602713613312933),
(0.014964578351082093, 5.602713613312933), (0.014964578351082093, 5.602713613312933),
(0.014964578351082093, 1.158821015684604), (0.014964578351082093, 1.158821015684604),
(0.014964578351082093, 1.158821015684604), (0.014964578351082093, 1.158821015684604),
(0.014964578351082093, 1.158821015684604), (0.014964578351082093, 1.158821015684604),
(0.014964578351082093, 1.158821015684604), (0.014964578351082093, 1.158821015684604),
(0.014964578351082093, 1.158821015684604),
(0.014964578351082093, 1.158821015684604), (0.014964578351082093, 1.158821015684604),
(0.014964578351082093, 1.158821015684604)]
return scalefactor, scalefactorresponse, horizon, \
minspendtotalconstraints, targetvalue, channels, \
objective, strategy, maxjumps, maxcovers, x0, boundsextrapolation
if __name__ == '__main__':
from collections import defaultdict
def tree():
return defaultdict(tree)
scalefactor, scalefactorresponse, horizon, \
minspendtotalconstraints, targetvalue, channels, \
objective, strategy, maxjumps, maxcovers, x0, boundsextrapolation = getdefaulttestvalues()
results = tree()
total_budgets = [50_000, 200_000, 250_000, 500_000]
targetvals = [1.0, 3.0, 5.0, 10.0]
for totbud in total_budgets:
for targetval in targetvals:
res2 = run(totalbudget=totbud, targetvalue=targetval,
horizon=horizon, channels=channels,
scalefactor=scalefactor, maxjumps=maxjumps,
maxcovers=maxcovers, minspendtotalconstraints=minspendtotalconstraints,
boundsextrapolation=boundsextrapolation, objective=getobjective2_,
x0=x0, strategy=strategy,
scalefactorresponse=scalefactorresponse)
print(results)
We are evaluating the objective function in both the targetpenalty and cost which i would really like to avoid(a single evaluation would speed up things alot), is there a workaround here?
Some ideas is to use parallelized computing utilizing e.g pathos,but i couldnt find any examples of doing so with diffev2, i will also try to reduce the number of workers but i found out that i get stuck in local optimas quite fast.. i found that the optimizer works well for all my datasets if i have around 50 workers..any ideas on how to speed this up?
EDIT code updated to allow for caching
def run(totalbudget, targetvalue, scalefactor, scalefactorresponse,
horizon, minspendtotalconstraints, channels, objective, strategy, maxjumps, maxcovers,
x0, boundsextrapolation):
totalbudget_ = totalbudget / scalefactor
constraints = generatemysticconstraints(horizon=horizon,
totalbudget=totalbudget_,
channels=channels,
minspendtotalconstraints=minspendtotalconstraints,
maxjumps=maxjumps,
maxcovers=maxcovers)
import mystic.cache as mc
a = mc.archive.read('cache.db', type=mc.archive.file_archive)
objectivecached = mc.cached(archive=a)(objective)
def targetpenalty(x, target):
if strategy == "targetroas":
return target * (sum(x) * scalefactor) - (-objectivecached(x) * scalefactorresponse)
elif strategy == "targetcpa":
return (sum(x) * scalefactor) - target * (-objectivecached(x) * scalefactorresponse)
else:
raise NotImplementedError
def totalbudgetpenalty(x, totalbudget):
return sum(x) - totalbudget
@linear_inequality(condition=totalbudgetpenalty, kwds={'totalbudget': totalbudget_})
@linear_inequality(condition=targetpenalty, kwds={'target': targetvalue})
def penalty(x):
return 0.0
constraintsimplified = simplify(constraints,
all=True)
from mystic.constraints import and_
generatedconstraints = generate_constraint(generate_solvers(constraintsimplified), join=and_)
# stepmon = VerboseMonitor(1)
stepmon = VerboseLoggingMonitor(1, 1)
cost = lambda x: objectivecached(x)
bounds = boundsextrapolation
print(f"starting configuration - totalbudget: {totalbudget}, targetvalue: {targetvalue}")
optimum = mystic.solvers.diffev2(cost,
x0=x0,
constraints=generatedconstraints,
bounds=bounds,
penalty=penalty,
full_output=True,
stepmon=stepmon,
itermon=stepmon,
disp=True,
tightrange=True,
npop=40,
# gtol=200,
# maxfun=5000,
maxiter=1000)
optimalvalueslst = optimum[0]
spend_channel_1 = optimalvalueslst[:horizon]
spend_channel_2 = optimalvalueslst[horizon:]
print(f"Percentage spent channel_1: {sum(spend_channel_1) / (sum(spend_channel_1) + sum(spend_channel_2))}")
print(f"Percentage spent channel_2: {sum(spend_channel_2) / (sum(spend_channel_1) + sum(spend_channel_2))}")
totalspent = np.sum(optimalvalueslst) * scalefactor
sales = -objective(optimalvalueslst.copy()) * scalefactorresponse
estimatedcpa = totalspent / sales
estimatedroas = sales / totalspent
penaltyvalue = targetvalue * sum(optimalvalueslst.copy()) - (-objective(optimalvalueslst.copy()))
if totalspent > totalbudget * 1.02:
# redo optimization
raise ValueError("Spent more than totalbudget -> roas target too low / cpa target to high")
print(f"total spent: {totalspent}")
print(f"estimated cpa: {estimatedcpa}")
print(f"estimated roas: {estimatedroas}")
print(f"penalty value: {penaltyvalue}")
return {'x': optimalvalueslst.copy(),
'sales': sales,
'estimatedroas': estimatedroas,
'totalspent': totalspent,
'totalbudget': totalbudget,
'targetvalue': targetvalue
}
Expanding on my comment... we create an expensive function
>>> def expensive(x):
... import time
... time.sleep(.01)
... return sum(x)
...
>>> import mystic as my
>>> import mystic.cache as mc
>>> import mystic.penalty as mp
Now create a standard file archive that captures function input and output, and retains calling order.
We generate a function that writes to the archive, where it uses function input as the key, and function output as the value and is ordered by function call.
>>> a = mc.archive.read('cache.db', type=mc.archive.file_archive)
>>> objective = mc.cached(archive=a)(expensive)
Set some bounds
>>> xlb = (0,1,0,0,0)
>>> xub = (1,10,10,10,10)
>>> bounds = list(zip(xlb, xub))
Create a penalty (where the objective is used)
>>> mon = my.monitors.Monitor()
>>> @mp.quadratic_inequality(condition=lambda x: (2 - objective(x)**2))
... def penalty(x):
... return 0.0
...
>>> penalty([0,0,0,1,0])
200.0
>>> penalty([1,1,1,1,1])
0.0
Optimize, using a evaluation monitor
>>> res = my.solvers.fmin(objective, x0=xlb, bounds=bounds, evalmon=mon, full_output=True, disp=False, penalty=penalty)
>>>
>>> x,y = res[:2]
>>> assert objective(x) + penalty(x) == y
Archived rv and cost should match evaluation monitor, regardless of size (to look up the last entry, we need to use keys(), so we just check all keys). The file archives preserve order, but we convert to nested lists to compare.
>>> xs,ys = list(list(k) for k in a.keys()), list(a.values())
>>> assert xs == mon.x
>>> assert ys == mon.y
Remove the archive
>>> import os
>>> os.remove('cache.db')