Skip to content

Commit 92db88c

Browse files
author
Shlomi Kushchi
authored
Merge pull request #135 from alpacahq/add_data_time_frames
Add another streaming source: minute aggs
2 parents 068bca8 + 6940461 commit 92db88c

File tree

3 files changed

+75
-27
lines changed

3 files changed

+75
-27
lines changed

alpaca_backtrader_api/alpacadata.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from backtrader.feed import DataBase
77
from backtrader import date2num, num2date
88
from backtrader.utils.py3 import queue, with_metaclass
9+
import backtrader as bt
910

1011
from alpaca_backtrader_api import alpacastore
1112

@@ -155,7 +156,13 @@ def islive(self):
155156
def __init__(self, **kwargs):
156157
self.o = self._store(**kwargs)
157158
self._candleFormat = 'bidask' if self.p.bidask else 'midpoint'
159+
self._timeframe = self.p.timeframe
158160
self.do_qcheck(True, 0)
161+
if self._timeframe not in [bt.TimeFrame.Ticks,
162+
bt.TimeFrame.Minutes,
163+
bt.TimeFrame.Days]:
164+
raise Exception(f'Unsupported time frame: '
165+
f'{bt.TimeFrame.TName(self._timeframe)}')
159166

160167
def setenvironment(self, env):
161168
"""
@@ -224,7 +231,9 @@ def _st_start(self, instart=True, tmout=None):
224231

225232
self._state = self._ST_HISTORBACK
226233
return True
227-
self.qlive = self.o.streaming_prices(self.p.dataname, tmout=tmout)
234+
self.qlive = self.o.streaming_prices(self.p.dataname,
235+
self.p.timeframe,
236+
tmout=tmout)
228237
if instart:
229238
self._statelivereconn = self.p.backfill_start
230239
else:
@@ -299,8 +308,13 @@ def _load(self):
299308
if self._laststatus != self.LIVE:
300309
if self.qlive.qsize() <= 1: # very short live queue
301310
self.put_notification(self.LIVE)
302-
303-
ret = self._load_tick(msg)
311+
if self.p.timeframe == bt.TimeFrame.Ticks:
312+
ret = self._load_tick(msg)
313+
elif self.p.timeframe == bt.TimeFrame.Minutes:
314+
ret = self._load_agg(msg)
315+
else:
316+
# might want to act differently in the future
317+
ret = self._load_agg(msg)
304318
if ret:
305319
return True
306320

@@ -410,6 +424,21 @@ def _load_tick(self, msg):
410424

411425
return True
412426

427+
def _load_agg(self, msg):
428+
dtobj = datetime.utcfromtimestamp(int(msg['time']))
429+
dt = date2num(dtobj)
430+
if dt <= self.lines.datetime[-1]:
431+
return False # time already seen
432+
self.lines.datetime[0] = dt
433+
self.lines.open[0] = msg['open']
434+
self.lines.high[0] = msg['high']
435+
self.lines.low[0] = msg['low']
436+
self.lines.close[0] = msg['close']
437+
self.lines.volume[0] = msg['volume']
438+
self.lines.openinterest[0] = 0.0
439+
440+
return True
441+
413442
def _load_history(self, msg):
414443
dtobj = msg['time'].to_pydatetime()
415444
dt = date2num(dtobj)

alpaca_backtrader_api/alpacastore.py

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
unicode_literals)
33
import os
44
import collections
5+
import time
56
from enum import Enum
67
import traceback
78

@@ -91,10 +92,17 @@ def _request(self,
9192

9293

9394
class Granularity(Enum):
95+
Ticks = "ticks"
9496
Daily = "day"
9597
Minute = "minute"
9698

9799

100+
class StreamingMethod(Enum):
101+
AccountUpdate = 'account_update'
102+
Quote = "quote"
103+
MinuteAgg = "minute_agg"
104+
105+
98106
class Streamer:
99107
conn = None
100108

@@ -104,7 +112,7 @@ def __init__(
104112
api_key='',
105113
api_secret='',
106114
instrument='',
107-
method='',
115+
method: StreamingMethod = StreamingMethod.AccountUpdate,
108116
base_url='',
109117
data_url='',
110118
data_stream='',
@@ -126,19 +134,23 @@ def __init__(
126134
self.q = q
127135
self.conn.on('authenticated')(self.on_auth)
128136
self.conn.on(r'Q.*')(self.on_quotes)
137+
self.conn.on(r'AM.*')(self.on_agg_min)
138+
self.conn.on(r'A.*')(self.on_agg_min)
129139
self.conn.on(r'account_updates')(self.on_account)
130140
self.conn.on(r'trade_updates')(self.on_trade)
131141

132142
def run(self):
133143
channels = []
134-
if not self.method:
144+
if self.method == StreamingMethod.AccountUpdate:
135145
channels = ['trade_updates'] # 'account_updates'
136146
else:
137147
if self.data_stream == 'polygon':
138-
maps = {"quote": "Q."}
148+
maps = {"quote": "Q.",
149+
"minute_agg": "AM."}
139150
elif self.data_stream == 'alpacadatav1':
140-
maps = {"quote": "alpacadatav1/Q."}
141-
channels = [maps[self.method] + self.instrument]
151+
maps = {"quote": "alpacadatav1/Q.",
152+
"minute_agg": "alpacadatav1/AM."}
153+
channels = [maps[self.method.value] + self.instrument]
142154

143155
loop = asyncio.new_event_loop()
144156
asyncio.set_event_loop(loop)
@@ -159,7 +171,8 @@ async def on_agg_sec(self, conn, subject, msg):
159171
self.q.put(msg)
160172

161173
async def on_agg_min(self, conn, subject, msg):
162-
self.q.put(msg)
174+
msg._raw['time'] = msg.end.to_pydatetime().timestamp()
175+
self.q.put(msg._raw)
163176

164177
async def on_account(self, conn, stream, msg):
165178
self.q.put(msg)
@@ -308,6 +321,8 @@ def get_positions(self):
308321
return positions
309322

310323
def get_granularity(self, timeframe, compression) -> Granularity:
324+
if timeframe == bt.TimeFrame.Ticks:
325+
return Granularity.Ticks
311326
if timeframe == bt.TimeFrame.Minutes:
312327
return Granularity.Minute
313328
elif timeframe == bt.TimeFrame.Days:
@@ -440,7 +455,7 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend,
440455
dates may or may not be specified by the user.
441456
when they do, they are probably don't include NY timezome data
442457
also, when granularity is minute, we want to make sure we get data when
443-
market is opened. so if it doesn't - let's get set end date to be last
458+
market is opened. so if it doesn't - let's set end date to be last
444459
known minute with opened market.
445460
this nethod takes care of all these issues.
446461
:param dtbegin:
@@ -451,18 +466,9 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend,
451466
if not dtend:
452467
dtend = pd.Timestamp('now', tz=NY)
453468
else:
454-
dtend = pd.Timestamp(pytz.timezone(NY).localize(dtend))
469+
dtend = pd.Timestamp(pytz.timezone('UTC').localize(dtend))
455470
if granularity == Granularity.Minute:
456471
calendar = trading_calendars.get_calendar(name='NYSE')
457-
if pd.Timestamp('now', tz=NY).date() == dtend.date():
458-
if calendar.is_open_on_minute(dtend):
459-
# we execute during market open, we don't want today's data
460-
# we will receive it through the websocket
461-
dtend = dtend.replace(hour=15,
462-
minute=59,
463-
second=0,
464-
microsecond=0)
465-
dtend -= timedelta(days=1)
466472
while not calendar.is_open_on_minute(dtend):
467473
dtend = dtend.replace(hour=15,
468474
minute=59,
@@ -479,7 +485,7 @@ def _make_sure_dates_are_initialized_properly(self, dtbegin, dtend,
479485
# if we start the script during market hours we could get this
480486
# situation. this resolves that.
481487
dtbegin -= timedelta(days=1)
482-
return dtbegin, dtend
488+
return dtbegin, dtend.astimezone(NY)
483489

484490
def get_aggs_from_polygon(self,
485491
dataname,
@@ -518,15 +524,15 @@ def _clear_out_of_market_hours(df):
518524
segment_start = dtbegin
519525
segment_end = segment_start + timedelta(weeks=2) if \
520526
dtend - dtbegin >= timedelta(weeks=2) else dtend
521-
while cdl.empty or cdl.index[-1] < dtend:
527+
while cdl.empty or cdl.index[-1] < dtend.replace(second=0):
522528
# we want to collect data until the last row is later than
523529
# the requested dtend. we don't force it to contain dtend
524530
# because it might be missing, or we may be resampling (so
525531
# again, it will be missing)
526532
response = self.oapi.polygon.historic_agg_v2(
527533
dataname,
528534
compression,
529-
granularity,
535+
'minute',
530536
_from=self.iso_date(segment_start.isoformat()),
531537
to=self.iso_date(segment_end.isoformat()))
532538
# No result from the server, most likely error
@@ -585,9 +591,12 @@ def _iterate_api_calls():
585591
timeframe = "5Min"
586592
elif granularity == 'minute' and compression == 15:
587593
timeframe = "15Min"
594+
elif granularity == 'ticks':
595+
timeframe = "minute"
588596
else:
589597
timeframe = granularity
590598
r = self.oapi.get_barset(dataname,
599+
'minute' if timeframe == 'ticks' else
591600
timeframe,
592601
limit=1000,
593602
end=curr.isoformat()
@@ -687,22 +696,31 @@ def _resample(df):
687696
response = response[~response.index.duplicated()]
688697
return response
689698

690-
def streaming_prices(self, dataname, tmout=None):
699+
def streaming_prices(self, dataname, timeframe, tmout=None):
691700
q = queue.Queue()
692-
kwargs = {'q': q, 'dataname': dataname, 'tmout': tmout}
701+
kwargs = {'q': q,
702+
'dataname': dataname,
703+
'timeframe': timeframe,
704+
'tmout': tmout}
693705
t = threading.Thread(target=self._t_streaming_prices, kwargs=kwargs)
694706
t.daemon = True
695707
t.start()
696708
return q
697709

698-
def _t_streaming_prices(self, dataname, q, tmout):
710+
def _t_streaming_prices(self, dataname, timeframe, q, tmout):
699711
if tmout is not None:
700712
_time.sleep(tmout)
713+
714+
if timeframe == bt.TimeFrame.Ticks:
715+
method = StreamingMethod.Quote
716+
elif timeframe == bt.TimeFrame.Minutes:
717+
method = StreamingMethod.MinuteAgg
718+
701719
streamer = Streamer(q,
702720
api_key=self.p.key_id,
703721
api_secret=self.p.secret_key,
704722
instrument=dataname,
705-
method='quote',
723+
method=method,
706724
base_url=self.p.base_url,
707725
data_url=os.environ.get("DATA_PROXY_WS", ''),
708726
data_stream='polygon' if self.p.usePolygon else

requirements/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
backtrader==1.9.76.123
22
alpaca-trade-api==0.51.0
33
trading_calendars==2.1.1
4+

0 commit comments

Comments
 (0)