bloombergblpapi

Unable to get streaming intraday bar data with bloomberg api wrapper blp


from blp import blp

with blp.BlpStream(setDefaultSubscriptionService="//blp/mktbar") as bs:
    bs.subscribe({"/ticker/ES1 Index": {"fields": "LAST_PRICE","options": "interval=1"},
                 })
    
    n = 0
    for ev in bs.events(timeout=120):
        print(ev['correlationIds'],[ev['element']["MarketDataEvents"][fld] for fld in ["PRICE_LAST_TIME_RT","LAST_PRICE"]])
        # or just print(ev)   
        n += 1
        if n > 3:
            break

Expecting OHLC bars, and since not specifying start and end times should be the current trading session.

Always times out; Does not seem to be receiving any events. Tried bar_size instead of interval, as well as different combinations of the ticker, and specifying times and not at all.

Some docs and comments seem to indicate that //blp/refdata should be used, instead of //blp/mktbar. But neither work for me


Solution

  • If I use the OP's code, it fails at the subscribe() call, telling me the session has not been started. Even if it had worked, there are subsequent problems.

    This code works for me, with an explicit session start, and gives an output every minute (using bar_size rather than interval):

    from blp import blp
    
    ohlc = {'OPEN':None, 'HIGH': None, 'LOW': None, 'CLOSE':None}
    
    def updateOhlc(flds):
        for k in ohlc.keys():
             if k in flds.keys():
                 ohlc[k] = flds[k]
    
    bs = blp.BlpStream(setDefaultSubscriptionService = '//blp/mktbar')
    
    bs.session.start()
    
    ticker = '/ticker/SPX Index'
    
    subs = bs.subscribe({ticker: {'fields': 'LAST_PRICE','options': 'bar_size=1'},})
    
    if subs[ticker]:
        n = 0
        for ev in bs.events(timeout=120):
            if n>=9:
                break
    
            msgType = ev['messageType']
            elt = ev['element']
            fields = elt[msgType]
    
            if msgType == 'MarketBarStart':
                if n==0:
                    updateOhlc(fields)
    
                print(fields['TIME'],ohlc)
                n+=1
    
            updateOhlc(fields)
    

    with output:

    18:04:00 {'OPEN': 4005.29, 'HIGH': 4006.42, 'LOW': 4005.24, 'CLOSE': 4006.42}
    18:05:00 {'OPEN': 4006.5, 'HIGH': 4007.96, 'LOW': 4006.5, 'CLOSE': 4007.95}
    18:06:00 {'OPEN': 4008.02, 'HIGH': 4008.74, 'LOW': 4007.66, 'CLOSE': 4008.38}
    18:07:00 {'OPEN': 4008.36, 'HIGH': 4009.46, 'LOW': 4008.36, 'CLOSE': 4009.46}
    18:08:00 {'OPEN': 4009.56, 'HIGH': 4009.56, 'LOW': 4008.15, 'CLOSE': 4008.6}
    18:09:00 {'OPEN': 4008.62, 'HIGH': 4010.1, 'LOW': 4008.41, 'CLOSE': 4010.07}
    18:10:00 {'OPEN': 4010.04, 'HIGH': 4010.73, 'LOW': 4009.97, 'CLOSE': 4010.19}
    18:11:00 {'OPEN': 4010.23, 'HIGH': 4010.25, 'LOW': 4009.32, 'CLOSE': 4010.25}
    18:12:00 {'OPEN': 4010.29, 'HIGH': 4010.66, 'LOW': 4009.88, 'CLOSE': 4009.92}
    

    The stream pumps different types of event, but the MarketBarStart only occurs at the one-minute interval (whereas there may be intermediate MarketBarUpdate events). Other event types may not contain all the fields eg: there may be no 'OPEN' key in the dictionary. The code keeps a running dictionary of the latest values.

    The subscribe() function returns a dictionary (with tickers as keys) which tells you whether the subscription was successful.

    The code for the blp wrapper is available here, while the workings of the underlying API can be found here on page 41.

    EDIT: This alternative code replicates what blp is doing, but instead uses the underlying blpapi provided by Bloomberg. It has some generic event handling, so you can see all the different events that the Bloomberg server is sending back.

    Note that the timeout parameter is in milli-seconds (not seconds), and this might actually be the cause of the OP's issue, as they have only a 120 ms wait. In my tests it can take maybe 6 seconds before the first subscription data comes through: but the OP is only allowing 3 x 120ms before giving up. Setting timeout=0 will cause the code to block until it gets a response.

    As you can see from the output with timeout=2000, there are still occasional timed-out waits, as Bloomberg hasn't sent any new subscription data.

    from blpapi import Session, Event, SubscriptionList, SubscriptionPreprocessMode
    
    evtTypes = {
        Event.ADMIN: 'ADMIN',
        Event.AUTHORIZATION_STATUS: "AUTHORIZATION_STATUS",
        Event.PARTIAL_RESPONSE: 'PARTIAL_RESPONSE',
        Event.REQUEST: 'REQUEST',
        Event.REQUEST_STATUS: 'REQUEST_STATUS',
        Event.RESOLUTION_STATUS: 'RESOLUTION_STATUS',
        Event.RESPONSE: 'RESPONSE',
        Event.SERVICE_STATUS: 'SERVICE_STATUS',
        Event.SESSION_STATUS: 'SESSION_STATUS',
        Event.SUBSCRIPTION_DATA: 'SUBSCRIPTION_DATA',
        Event.SUBSCRIPTION_STATUS: 'SUBSCRIPTION_STATUS',
        Event.TIMEOUT: 'TIMEOUT',
        Event.TOKEN_STATUS: 'TOKEN_STATUS',
        Event.TOPIC_STATUS: 'TOPIC_STATUS',
        Event.UNKNOWN: 'UNKNOWN' }
    
    def handleEvent(evt):
        global nBarEvents
    
        eventType = evt.eventType()
        if eventType == Event.SUBSCRIPTION_DATA:
            nBarEvents+=1
            if nBarEvents > 2: 
                return False
    
            for msg in evt:
                msgType = msg.messageType()
                print(evtTypes[eventType],':',msgType)
                for elt in msg:
                    print('   ',str(elt.name()) + ":" + elt.getValueAsString()) 
        else:
            print(evtTypes[eventType])
    
        return True #False to terminate event loop
    
    session = Session()
    session.start()
    
    serviceName = '//blp/mktbar'
    
    ticker = 'ES1 Index'
    field = 'LAST_PRICE'
    options = 'bar_size=1'
    
    subs = SubscriptionList()
    subs.add(serviceName + '/' + ticker,field,options)
    
    res = session.subscribe(subs,mode=SubscriptionPreprocessMode.RETURN_INDIVIDUAL_ERRORS)
    
    if len(res) == 0: #Successful subscription
        nBarEvents = 0
        bContinue = True
        while bContinue:
            bContinue = handleEvent(session.nextEvent(timeout=2000))
    
    session.stop()
    

    With this output:

    SESSION_STATUS
    SESSION_STATUS
    SERVICE_STATUS
    SUBSCRIPTION_STATUS
    TIMEOUT
    TIMEOUT
    TIMEOUT
    SUBSCRIPTION_DATA : MarketBarStart
        TIME:11:35:00
        OPEN:4013.750000
        HIGH:4014.000000
        LOW:4013.750000
        CLOSE:4013.750000
        NUMBER_OF_TICKS:6
        VOLUME:34
        VALUE:136471.000000
        DATE_TIME:2023-02-23T11:35:00.000+00:00
    TIMEOUT
    SUBSCRIPTION_DATA : MarketBarUpdate
        TIME:11:35:00
        OPEN:4013.750000
        HIGH:4014.000000
        LOW:4013.750000
        CLOSE:4014.000000
        NUMBER_OF_TICKS:7
        VOLUME:49
        VALUE:196681.000000
        DATE_TIME:2023-02-23T11:35:00.000+00:00
    TIMEOUT