pythonpandasinteractive-brokersib-api

Interactive Brokers Python Multiple Symbol Request


I was able to piece together a script from IB's documentation/examples and forums on this site. I am getting the output I want for a single symbol, however, if I use a list of stocks, I cannot figure out a way to pass the ticker symbol through to the DF output file. My workaround was to create a dictionary that uses the list sequence (see below) however the output from IB's API changes slightly each time rendering the symbols mostly pointless. The list I am using below normally has 20+ names but may change, i cut it down to make it easier to view.

@Brian/and or other developers, if there is a way to create either a unique ID/sequence for each symbol call and stamp it to data that is brought back, i can then utilize a dictionary to apply the symbol. In the other forum, you passed in a line where n_id = n_id +1, if that can be applied and is linked to each specific call which is done in order of the list, then that could work?

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import threading
import time
from datetime import timedelta
import datetime

class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] #Initialize variable to store candle

    def historicalData(self, reqId, bar):
        #print(f'Time: {bar.date} Close: {bar.close} Volume: {bar.volume}',reqId)
        self.data.append([bar.date, bar.close, bar.volume, reqId])
     
def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

#Start the socket in a thread
api_thread = threading.Thread(target=run_loop, daemon=True)
api_thread.start()

time.sleep(1) #Sleep interval to allow time for connection to server

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND"
    app.reqHistoricalData(1, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])

 time.sleep(5) #sleep to allow enough time for data to be returned

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','reqId'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s') #,unit='s') 

df['Count'] = df.groupby('DateTime').cumcount()+1
sym_dict = {1:'SPY',2:'MSFT',3:'GOOG',4:'AAPL',5:'QQQ',6:'IWM',7:'TSLA'}

df['Ticker'] = df['Count'].map(sym_dict)

print(df)

#edit, adding in @Brian's detail:

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import time
from datetime import timedelta
import datetime

start = datetime.datetime.utcnow()

class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] 

def error(self, reqId, errorCode, errorString):
    print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

def historicalData(self, reqId, bar):
    self.data.append([bar.date, bar.close, bar.volume, sym_dict[reqId]])
    print("HistoricalData. ReqId:", sym_dict[reqId], "BarData.", bar)
 
# include this callback to track progress and maybe disconnectwhen all are finished
def historicalDataEnd(self, reqId: int, start: str, end: str):
    print("finished", sym_dict[reqId])

def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

# you should wait for nextValidId instead of sleeping, what if it takes more than 1 second? @john: how do i do this?
time.sleep(5) @john: how do i do this? wait for nextValidId?

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

reqId = 1
sym_dict = {}
for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    sym_dict[reqId] = sym
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND" # you may need this for msft
    app.reqHistoricalData(reqId, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])
    reqId += 1
    time.sleep(5)

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','sym'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s') #,unit='s') 
df = df.set_index(['sym','DateTime']).sort_index()
print(df)
app.disconnect()

Solution

  • You just need to maintain a dict of reqId and symbol.

    I'm not sure that one DataFrame is the best way to store your data but if you do then set a multi index. Decide how much data you want and how you're going to store it on disk and then decide on a data structure. I suggest csv for speed or sqlite for simplicity. Pandas can handle either.

    I deleted your comments and added some of my own.

    from ibapi.client import EClient
    from ibapi.wrapper import EWrapper
    from ibapi.contract import Contract
    import pandas as pd
    import threading
    import time
    from datetime import timedelta
    import datetime
    
    # I added this code to get fake data, works wtihout tws running
    from ibapi.common import BarData
    from random import random
    start = datetime.datetime.utcnow()
    def fake_data(reqId, ib):
        last = reqId*10
        for i in range(60, 0, -10):
            bar = BarData();
            bar.date = start - timedelta(minutes=i)
            last += random() - 0.5
            bar.close = last
            bar.volume = reqId * 1000
            ib.historicalData(reqId, bar)
        ib.historicalDataEnd(reqId,"","")
        
    class IBapi(EWrapper, EClient):
        def __init__(self):
            EClient.__init__(self, self)
            self.data = [] 
    
        #always include this for important messages, also turn on api logging in TWS/IBG    
        def error(self, reqId, errorCode, errorString):
            print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
    
        def historicalData(self, reqId, bar):
            self.data.append([bar.date, bar.close, bar.volume, sym_dict[reqId]])
         
        # include this callback to track progress and maybe disconnectwhen all are finished
        def historicalDataEnd(self, reqId: int, start: str, end: str):
            print("finished", sym_dict[reqId])
            
    def run_loop():
        app.run()
    
    app = IBapi()
    app.connect('127.0.0.1', 7496, 123)
    
    # threading is needed only if you plan to interact after run is called
    # this is a good way if you use a ui like jupyter
    api_thread = threading.Thread(target=run_loop, daemon=True)
    api_thread.start()
    
    # you should wait for nextValidId instead of sleeping, what if it takes more than 1 second?
    time.sleep(1)
    
    symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']
    
    reqId = 1
    sym_dict = {}
    for sym in symbols:
        contract = Contract()
        contract.symbol = str(sym) 
        sym_dict[reqId] = sym
        contract.secType = "STK"
        contract.exchange = "SMART"
        contract.currency = "USD"
        #contract.primaryExchange = "ISLAND" # you may need this for msft
        #app.reqHistoricalData(reqId, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])
        fake_data(reqId, app)
        reqId += 1
        #now you need to sleep(10) to make sure you don't get a pacing error for too many requests
        
    # don't sleep, use historicalDataEnd to know when finished
    time.sleep(5)
    
    df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','sym'])
    df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s')
    
    #make an index and sort
    df = df.set_index(['sym','DateTime']).sort_index()
    # now you can use the indexes
    print(df.loc[("SPY","2021")])
    
    #don't forget to disconnect somewhere or the clientId will still be in use