Python Forum
Code runs perfectly and just stops with zero errors
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Code runs perfectly and just stops with zero errors
#1
Good afternoon all,

I've got a really odd issue and I simply don't know where it is, I've been working with pulling tickers from MongoDB and then piping that into an async def to then pull live ticker data.

To note, this code works perfectly, other than the fact when it gets to around 500 - 530, it stops. There are no errors that it shows, there are no errors in the IB gateway and I'm not running out of resources either as I checked both VMs.

import motor.motor_asyncio import asyncio import logging from ib_async import IB, Stock import asyncpg import signal # -------------------- Logging -------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # -------------------- MongoDB Fetch -------------------- async def fetch_tickers(): client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://192.168.1.152:27017") db = client["tickers"] collection = db["nasdaq_tickers"] tickers = [] async for doc in collection.find({}): tickers.append({ "symbol": doc.get("symbol"), "currency": "USD", "exchange": "NASDAQ" }) return tickers # -------------------- PostgreSQL -------------------- async def setup_db(): try: return await asyncpg.create_pool( user="raggy_sql", password='"', database="raggy_sql", host="192.168.1.152", port=5432, min_size=1, max_size=10, timeout=30 ) except Exception as e: logger.error(f"Failed to connect to DB: {e}") raise async def insert_tick(pool, batch): if not batch: return try: async with pool.acquire() as conn: await conn.executemany(""" INSERT INTO ib_realtime_bars( symbol, marketDataType, mintick, bid, bidsize, ask, asksize, last, lastsize, volume, close, marketprice ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12 ) """, [ ( d["symbol"], d["marketDataType"], d["minTick"], d["bid"], d["bidSize"], d["ask"], d["askSize"], d["last"], d["lastSize"], d["volume"], d["close"], d["marketPrice"] ) for d in batch ]) except Exception as e: logger.error(f"Failed batch insert: {e}") # -------------------- Tick Processor -------------------- async def tick_worker(queue, pool): while True: try: tick = await queue.get() await insert_tick(pool, [tick]) # wrap in list for executemany queue.task_done() except asyncio.CancelledError: break # -------------------- IB Data -------------------- MAX_CONCURRENT_TICKERS = 1 # lowered to 1 per second async def process_ticker_batch(pool, ib, contracts, queue): ticker_objs = [ib.reqMktData(c) for c in contracts] await asyncio.sleep(1) for contract, ticker in zip(contracts, ticker_objs): if ticker.last is not None: tick_data = { "symbol": contract.symbol, "marketDataType": getattr(ticker, "marketDataType", None), "minTick": getattr(ticker, "minTick", None), "bid": getattr(ticker, "bid", None), "bidSize": getattr(ticker, "bidSize", None), "ask": getattr(ticker, "ask", None), "askSize": getattr(ticker, "askSize", None), "last": ticker.last, "lastSize": getattr(ticker, "lastSize", None), "volume": getattr(ticker, "volume", None), "close": ticker.last, "marketPrice": ticker.last, } await queue.put(tick_data) # cancel market data to rotate cleanly for c in contracts: ib.cancelMktData(c) # -------------------- Main -------------------- async def main(): ib = IB() pool = await setup_db() tick_queue = asyncio.Queue() # Start worker worker_task = asyncio.create_task(tick_worker(tick_queue, pool)) # Graceful shutdown stop_event = asyncio.Event() def shutdown(): stop_event.set() for sig in (signal.SIGINT, signal.SIGTERM): signal.signal(sig, lambda s, f: shutdown()) # Connect IB while True: try: await ib.connectAsync("127.0.0.1", 4002, clientId=1) logger.info("Connected to IB API") break except Exception as e: logger.warning(f"IB connect failed: {e}, retrying in 5s...") await asyncio.sleep(5) # Fetch tickers from MongoDB logger.info("Fetching tickers from MongoDB...") tickers = await fetch_tickers() logger.info(f"Fetched {len(tickers)} tickers. Example: {tickers[:]}") # Create contracts contracts = [Stock(t["symbol"], t["exchange"], t["currency"]) for t in tickers] contracts = await ib.qualifyContractsAsync(*contracts) logger.info(f"Qualified {len(contracts)} contracts") # Rotating batch loop async def rotating_batches(): while not stop_event.is_set(): for i in range(0, len(contracts), MAX_CONCURRENT_TICKERS): batch = contracts[i:i + MAX_CONCURRENT_TICKERS] await process_ticker_batch(pool, ib, batch, tick_queue) await asyncio.sleep(1) rotator_task = asyncio.create_task(rotating_batches()) # Wait for shutdown await stop_event.wait() logger.info("Shutting down...") rotator_task.cancel() await asyncio.gather(rotator_task, return_exceptions=True) worker_task.cancel() await asyncio.gather(worker_task, return_exceptions=True) await pool.close() ib.disconnect() logger.info("Disconnected from IB and DB") if __name__ == "__main__": asyncio.run(main())
I then discussed it with IB, It thought that maybe I can't pull too many tickers, not the case, I then asked if I needed to wait for the closure of the ticker request, nope not that either. They said as long as it was only 50 per second, it wouldn't matter. So I thought ok I'll set my concurrents to 1, this does take affect and took a lot longer but none the less, 538 and boom it stopped.

It stops at 520+ or so on any batch number, I set it to 1 second on the delay and 50 on the tickers and it again, hit 5** odd and again just stopped. I also did a much longer sleep and only 40 tickers and that failed too.

To note, I checked nasdaq_tickers and there are 1300 objects in there so it's not that :(

The code is running on a windows 11 VM, mongo, postgres is all on ubuntu 24.04. Please ignore my terrible authentication methods, I'll be getting vault in soon :)

Thanks
Comps
Reply
#2
Please delete this, I found my issue.

Thanks
Comps
Reply
#3
(Nov-05-2025, 03:45 PM)compuman145 Wrote: Please delete this, I found my issue.

Thanks
Comps
That's good. But for other people who may face a similar problem sometime in the future it would be very kind from you if you briefly would describe what was your issue and how you fixed it.

Regards, noisefloor
Larz60+ and buran like this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  code not running even without errors Azdaghost 2 1,004 Apr-25-2025, 07:35 PM
Last Post: Azdaghost
  problem program runs in a loop jasserin 0 1,312 May-18-2024, 03:07 PM
Last Post: jasserin
  pip stops waiting for python walker 6 4,218 Nov-28-2023, 06:55 PM
Last Post: walker
  Are there errors in the code for my coin toss systems? Matlibplot is too perfect . Coolkat 0 1,201 Nov-13-2023, 11:54 AM
Last Post: Coolkat
  [SOLVED] [Linux] Script in cron stops after first run in loop Winfried 2 2,129 Nov-16-2022, 07:58 PM
Last Post: Winfried
  Another program runs bho68 7 3,332 Nov-08-2022, 08:16 PM
Last Post: bho68
  My code won't say Player wins or computer wins. No errors. its_a_meLuigi64 2 2,825 Dec-01-2021, 04:43 PM
Last Post: its_a_meLuigi64
  Know when the pyttsx3 engine stops talking UsualCoder 3 7,790 Aug-29-2021, 11:08 PM
Last Post: snippsat
  Rmarkdown opened by python code - errors Rav013 0 3,085 Apr-27-2021, 03:13 PM
Last Post: Rav013
  [split] Kera Getting errors when following code example Image classification from scratch hobbyist 3 6,318 Apr-13-2021, 01:26 PM
Last Post: amirian

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020
This forum uses Lukasz Tkacz MyBB addons.
Forum use Krzysztof "Supryk" Supryczynski addons.