Nov-05-2025, 03:08 PM (This post was last modified: Nov-05-2025, 03:08 PM by compuman145.)
Good afternoon all,
I've got a really odd issue and I simply don't know where it is, I've been working with pulling tickers from MongoDB and then piping that into an async def to then pull live ticker data.
To note, this code works perfectly, other than the fact when it gets to around 500 - 530, it stops. There are no errors that it shows, there are no errors in the IB gateway and I'm not running out of resources either as I checked both VMs.
It stops at 520+ or so on any batch number, I set it to 1 second on the delay and 50 on the tickers and it again, hit 5** odd and again just stopped. I also did a much longer sleep and only 40 tickers and that failed too.
To note, I checked nasdaq_tickers and there are 1300 objects in there so it's not that :(
The code is running on a windows 11 VM, mongo, postgres is all on ubuntu 24.04. Please ignore my terrible authentication methods, I'll be getting vault in soon :)
Thanks
Comps
I've got a really odd issue and I simply don't know where it is, I've been working with pulling tickers from MongoDB and then piping that into an async def to then pull live ticker data.
To note, this code works perfectly, other than the fact when it gets to around 500 - 530, it stops. There are no errors that it shows, there are no errors in the IB gateway and I'm not running out of resources either as I checked both VMs.
import motor.motor_asyncio import asyncio import logging from ib_async import IB, Stock import asyncpg import signal # -------------------- Logging -------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # -------------------- MongoDB Fetch -------------------- async def fetch_tickers(): client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://192.168.1.152:27017") db = client["tickers"] collection = db["nasdaq_tickers"] tickers = [] async for doc in collection.find({}): tickers.append({ "symbol": doc.get("symbol"), "currency": "USD", "exchange": "NASDAQ" }) return tickers # -------------------- PostgreSQL -------------------- async def setup_db(): try: return await asyncpg.create_pool( user="raggy_sql", password='"', database="raggy_sql", host="192.168.1.152", port=5432, min_size=1, max_size=10, timeout=30 ) except Exception as e: logger.error(f"Failed to connect to DB: {e}") raise async def insert_tick(pool, batch): if not batch: return try: async with pool.acquire() as conn: await conn.executemany(""" INSERT INTO ib_realtime_bars( symbol, marketDataType, mintick, bid, bidsize, ask, asksize, last, lastsize, volume, close, marketprice ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12 ) """, [ ( d["symbol"], d["marketDataType"], d["minTick"], d["bid"], d["bidSize"], d["ask"], d["askSize"], d["last"], d["lastSize"], d["volume"], d["close"], d["marketPrice"] ) for d in batch ]) except Exception as e: logger.error(f"Failed batch insert: {e}") # -------------------- Tick Processor -------------------- async def tick_worker(queue, pool): while True: try: tick = await queue.get() await insert_tick(pool, [tick]) # wrap in list for executemany queue.task_done() except asyncio.CancelledError: break # -------------------- IB Data -------------------- MAX_CONCURRENT_TICKERS = 1 # lowered to 1 per second async def process_ticker_batch(pool, ib, contracts, queue): ticker_objs = [ib.reqMktData(c) for c in contracts] await asyncio.sleep(1) for contract, ticker in zip(contracts, ticker_objs): if ticker.last is not None: tick_data = { "symbol": contract.symbol, "marketDataType": getattr(ticker, "marketDataType", None), "minTick": getattr(ticker, "minTick", None), "bid": getattr(ticker, "bid", None), "bidSize": getattr(ticker, "bidSize", None), "ask": getattr(ticker, "ask", None), "askSize": getattr(ticker, "askSize", None), "last": ticker.last, "lastSize": getattr(ticker, "lastSize", None), "volume": getattr(ticker, "volume", None), "close": ticker.last, "marketPrice": ticker.last, } await queue.put(tick_data) # cancel market data to rotate cleanly for c in contracts: ib.cancelMktData(c) # -------------------- Main -------------------- async def main(): ib = IB() pool = await setup_db() tick_queue = asyncio.Queue() # Start worker worker_task = asyncio.create_task(tick_worker(tick_queue, pool)) # Graceful shutdown stop_event = asyncio.Event() def shutdown(): stop_event.set() for sig in (signal.SIGINT, signal.SIGTERM): signal.signal(sig, lambda s, f: shutdown()) # Connect IB while True: try: await ib.connectAsync("127.0.0.1", 4002, clientId=1) logger.info("Connected to IB API") break except Exception as e: logger.warning(f"IB connect failed: {e}, retrying in 5s...") await asyncio.sleep(5) # Fetch tickers from MongoDB logger.info("Fetching tickers from MongoDB...") tickers = await fetch_tickers() logger.info(f"Fetched {len(tickers)} tickers. Example: {tickers[:]}") # Create contracts contracts = [Stock(t["symbol"], t["exchange"], t["currency"]) for t in tickers] contracts = await ib.qualifyContractsAsync(*contracts) logger.info(f"Qualified {len(contracts)} contracts") # Rotating batch loop async def rotating_batches(): while not stop_event.is_set(): for i in range(0, len(contracts), MAX_CONCURRENT_TICKERS): batch = contracts[i:i + MAX_CONCURRENT_TICKERS] await process_ticker_batch(pool, ib, batch, tick_queue) await asyncio.sleep(1) rotator_task = asyncio.create_task(rotating_batches()) # Wait for shutdown await stop_event.wait() logger.info("Shutting down...") rotator_task.cancel() await asyncio.gather(rotator_task, return_exceptions=True) worker_task.cancel() await asyncio.gather(worker_task, return_exceptions=True) await pool.close() ib.disconnect() logger.info("Disconnected from IB and DB") if __name__ == "__main__": asyncio.run(main())I then discussed it with IB, It thought that maybe I can't pull too many tickers, not the case, I then asked if I needed to wait for the closure of the ticker request, nope not that either. They said as long as it was only 50 per second, it wouldn't matter. So I thought ok I'll set my concurrents to 1, this does take affect and took a lot longer but none the less, 538 and boom it stopped.It stops at 520+ or so on any batch number, I set it to 1 second on the delay and 50 on the tickers and it again, hit 5** odd and again just stopped. I also did a much longer sleep and only 40 tickers and that failed too.
To note, I checked nasdaq_tickers and there are 1300 objects in there so it's not that :(
The code is running on a windows 11 VM, mongo, postgres is all on ubuntu 24.04. Please ignore my terrible authentication methods, I'll be getting vault in soon :)
Thanks
Comps
