The reason why I ask this question is because of the manner in which bloomberg sends its data via BLPAPI. Following on from this post, I want to establish an efficient method of obtaining the value of a specific field. As the nature of the way that data is sent means that there can be multiple messages (msg's) in the session.nextEvent() and that surplus data is sent such there is more data than requested I was wondering whether there was a known efficient way of doing so. So far the techniques and emthods I have used means than for 60 securities and 5 subscriptions the data is never live as it lags behind and I beleive the reason for this is how I manage the data coming in. I have some example below signifying an example subscriptin for one security. Given that MKTDATA_EVENT_TYPE and MKTDATA_EVENT_SUBTYPE can be different I am struggling to find an effective way to-do this.
My aim is to avoid for loops where possible and opt for dictionary's to direct me to the value wanted.
import blpapi
from bloomberg import BloombergSessionHandler
# session = blpapi.Session()
host='localhost'
port=8194
session_options = blpapi.SessionOptions()
session_options.setServerHost(host)
session_options.setServerPort(port)
session_options.setSlowConsumerWarningHiWaterMark(0.05)
session_options.setSlowConsumerWarningLoWaterMark(0.02)
session = blpapi.Session(session_options)
if not session.start():
print("Failed to start Bloomberg session.")
subscriptions = blpapi.SubscriptionList()
fields = ['BID','ASK','TRADE','LAST_PRICE','LAST_TRADE']
subscriptions.add('GB00BLPK7110 @UKRB Corp', fields)
session.subscribe(subscriptions)
session.start()
while(True):
event = session.nextEvent()
print("Event type:",event.eventType())
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
i = 0
for msg in event:
print("This is msg ", i)
i+=1
print("\n" , "msg is ", msg, "\n")
print(" Message type:",msg.messageType())
eltMsg = msg.asElement();
msgType = eltMsg.getElement('MKTDATA_EVENT_TYPE').getValueAsString();
msgSubType = eltMsg.getElement('MKTDATA_EVENT_SUBTYPE').getValueAsString();
print(" ",msgType,msgSubType)
for fld in fields:
print(" Fields are :", fields)
if eltMsg.hasElement(fld):
print(" ",fld,eltMsg.getElement(fld).getValueAsFloat())
else:
for msg in event:
print(" Message type:",msg.messageType())
I tried obtaining the values for the specified fields I subscribed to but found that my code was too slow and as such didn't meet the requirements to display live data.
def process_subscription_data1(self, session):
while True:
event = session.nextEvent()
print(f"The event is {event}")
if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
print(f"The event type is: {event.eventType()}")
for msg in event:
print(f"The msg is: {msg}")
data = {'instrument': msg.correlationIds()[0].value()}
print(f"The data is: {data}")
# Processing fields efficiently
for field in self.fields:
print("field is ", field, " ", self.fields)
element = msg.getElement(field) if msg.hasElement(field) else None
print("element is ", element)
data[field] = element.getValueAsString() if element and not element.isNull() else 'N/A'
print(f"Emitting data for {data}")
self.data_signal.emit(data) # Emit data immediately for each message
^^ code which I have tried and was far too slow (even without the print statements they are just showing how convoluted the code is)
One way to "throttle" the rate at which real-time ticks come from Bloomberg to specify an interval
when adding a subscription:
subs = blpapi.SubscriptionList()
flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
tickers = ['RXA Comdty','GB00BLPK7110 @UKRB Corp']
nId = 0
for t in tickers:
subs.add(t,flds,options={'interval':1},correlationId=blpapi.CorrelationId(nId))
nId += 1
session.subscribe(subs)
This restricts the messages to 1 second intervals (in this example), and each message will contain a 'summary' of all the data for the given ticker. Each message will be larger as it will contain everything and not just the fields you specify. You can specify different intervals for each subscription if some tickers are less active than others.
If you are building a gui, then this will be event-driven and have a message loop. It could make sense to move to the asynchronous
/callback method of handling Bloomberg events. Here the messages are handled in a separate worker thread, and you only need to alert the gui if an event is of interest.
import blpapi
import threading
from queue import Queue
def processEvent(evt,session):
et = evt.eventType()
if et == blpapi.Event.SESSION_STATUS:
print('Session Status event')
for msg in evt:
print(' ',msg.messageType())
if msg.messageType() == 'SessionStarted':
sessionReady.set()
return
if et == blpapi.Event.SUBSCRIPTION_STATUS:
print('Subscription Status event')
for msg in evt:
cId = msg.correlationId()
print(' ',msg.messageType(),'for ticker:',tickers[cId.value()])
return
if et == blpapi.Event.SUBSCRIPTION_DATA:
for msg in evt:
cId = msg.correlationId()
tkr = tickers[cId.value()]
eltMsg = msg.asElement()
for f in flds:
if eltMsg.hasElement(f):
v = eltMsg.getElement(f).getValueAsFloat()
tick = (tkr,{f:v})
qTicks.put(tick)
return
qTicks = Queue()
sessionReady = threading.Event()
session = blpapi.Session(eventHandler=processEvent)
print('Waiting for sesssion to start')
session.startAsync()
sessionReady.wait()
print('Session started')
subs = blpapi.SubscriptionList()
flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
tickers = ['EUR Curncy','RXA Comdty','GB00BLPK7110 @UKRB Corp']
nId = 0
for t in tickers:
subs.add(t,flds,correlationId=blpapi.CorrelationId(nId))
nId += 1
session.subscribe(subs)
while True:
try:
tick = qTicks.get(True)
print(tick)
except:
print('Terminating')
break
session.stopAsync()
One wrinkle is that startAsync
returns immediately, so your Session
may not be ready when you try and add subscriptions. One way to get around this is to wait for the SessionStarted
message and signal a python Event. Before you start to work with the Session, wait
on this event.
I don't know what kind of message loop the gui might have. In this example I am using a python Queue
to send the tick data from the worker thread to the main thread (Ctrl-C will generate an exception and end the loop). Another alternative is to 'post' messages to the main thread's message queue. The processEvent
callback can be used to decide when to fire an event to the main thread: eg the Bloomberg message contains (inconsistently-named) last update times for each field and these could be used to determine if the message's data has changed since the last tick: you would only signal new data ticks, cutting down the amount of screen updating in the gui.