I have the following working code:
import json
import requests
import os
import pandas as pd
import random
from locust import task, events
from locust.contrib.fasthttp import FastHttpUser
from azure.identity import DefaultAzureCredential
server_name = "https://test"
credential = DefaultAzureCredential()
token = credential.get_token("xxx")
token = token.token
def read_componentid_writepowerplant():
csv_file = os.path.join("C:", os.sep, "PythonScripting", "MyCode", "pythonProject",
"powerplants.csv")
comid = pd.read_csv(csv_file)["COMPONENTID"]
return int(comid.sample(1))
def set_value_writepowerplant():
value_random = random.randrange(500, 900)
return value_random
null = None
global hpsId
def setpayload_writepowerplant(dt_value):
myjson = {
"name": "Jostedal",
"units": [
{
"generatorName": "Jostedal G1",
"componentId": int(read_componentid_writepowerplant()),
"timeSeries": null
}
],
"hpsId": 10014,
"powerPlantId": 31109,
"readPeriod": [
1711407600,
1711494000
],
"timeAxis": {
"t0": 1711407600,
"dt": int(dt_value),
"n": 96
}
}
return myjson
@events.request.add_listener
def my_request_handler(request_type, name, response_time, response_length, response,
context, exception, start_time, url, **kwargs):
if exception:
print(f"Request to {name} failed with exception {exception}")
else:
print(f"Successfully made a request to: {name}")
class MffrApi(FastHttpUser):
host = server_name
network_timeout = 1000000.0
connection_timeout = 1000000.0
def read_powerplant(self):
resp = self.client.get(f'/xxx/all/',
headers={"Authorization": f'Bearer {token}'},
name='/GetAllPowerplants')
# print(resp.request.body)
# print(resp.request.headers)
# print(resp.request.url)
# print(resp.text)
hpsid = resp.json()[15]["hpsId"]
print(hpsid)
def write_powerplant(self):
payload = json.dumps(setpayload_writepowerplant(set_value_writepowerplant()), indent=2)
resp = self.client.post(f'/xxx',
headers={
"Authorization": f'Bearer {token}', "Content-Type": 'application/json'},
name='/PostSingelPowerplant',
data=payload)
# print(resp.request.body)
# print(resp.request.headers)
# print(resp.request.url)
# print(resp.text)
@task(1)
def test_read_mfrr(self):
self.read_powerplant()
self.write_powerplant()
I want the correlated value "hpsid" from method "read_powerplant" to be put into the static json-payload in method "setpayload_writepowerplant" to be used in the method "write_powerplant".
Do I need to define this variable as global to being able to do that?
Or do I need to change scopes of some of the methods to be able to add support for cross-referencing values between methods?
Just save it as an object attribute on the User.
Also, no need to dump your json manually, just pass the dict as the json
parameter to the request call, it will also set the accept and content-type headers to json.
def read_powerplant(self):
…
self.hpsid = resp.json()[15]["hpsId"]
def write_powerplant(self):
resp = self.client.post(f'/xxx', json={”hpsId”: self.hpsid, …}
…