@@ -159,40 +159,9 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
159159attempts := 0
160160
161161for {
162- // If this isn't the first attempt, we're retrying
163- if attempts > 0 {
164- if attempts > maxRetries {
165- eventChan <- ProviderEvent {
166- Type : EventError ,
167- Error : errors .New ("maximum retry attempts reached for rate limit (429)" ),
168- }
169- return
170- }
171-
172- // Inform user we're retrying with attempt number
173- eventChan <- ProviderEvent {
174- Type : EventWarning ,
175- Info : fmt .Sprintf ("[Retrying due to rate limit... attempt %d of %d]" , attempts , maxRetries ),
176- }
177-
178- // Calculate backoff with exponential backoff and jitter
179- backoffMs := 2000 * (1 << (attempts - 1 )) // 2s, 4s, 8s, 16s, 32s
180- jitterMs := int (float64 (backoffMs ) * 0.2 )
181- totalBackoffMs := backoffMs + jitterMs
182-
183- // Sleep with backoff, respecting context cancellation
184- select {
185- case <- ctx .Done ():
186- eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
187- return
188- case <- time .After (time .Duration (totalBackoffMs ) * time .Millisecond ):
189- // Continue with retry
190- }
191- }
192162
193163attempts ++
194164
195- // Create new streaming request
196165stream := a .client .Messages .NewStreaming (
197166ctx ,
198167anthropic.MessageNewParams {
@@ -213,11 +182,8 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
213182},
214183)
215184
216- // Process stream events
217185accumulatedMessage := anthropic.Message {}
218- streamSuccess := false
219186
220- // Process the stream until completion or error
221187for stream .Next () {
222188event := stream .Current ()
223189err := accumulatedMessage .Accumulate (event )
@@ -247,7 +213,6 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
247213eventChan <- ProviderEvent {Type : EventContentStop }
248214
249215case anthropic.MessageStopEvent :
250- streamSuccess = true
251216content := ""
252217for _ , block := range accumulatedMessage .Content {
253218if text , ok := block .AsAny ().(anthropic.TextBlock ); ok {
@@ -270,51 +235,59 @@ func (a *anthropicProvider) StreamResponse(ctx context.Context, messages []messa
270235}
271236}
272237
273- // If the stream completed successfully, we're done
274- if streamSuccess {
238+ err := stream . Err ()
239+ if err == nil {
275240return
276241}
277242
278- // Check for stream errors
279- err := stream .Err ()
280- if err != nil {
281- var apierr * anthropic.Error
282- if errors .As (err , & apierr ) {
283- if apierr .StatusCode == 429 || apierr .StatusCode == 529 {
284- // Check for Retry-After header
285- if retryAfterValues := apierr .Response .Header .Values ("Retry-After" ); len (retryAfterValues ) > 0 {
286- // Parse the retry after value (seconds)
287- var retryAfterSec int
288- if _ , err := fmt .Sscanf (retryAfterValues [0 ], "%d" , & retryAfterSec ); err == nil {
289- retryMs := retryAfterSec * 1000
290-
291- // Inform user of retry with specific wait time
292- eventChan <- ProviderEvent {
293- Type : EventWarning ,
294- Info : fmt .Sprintf ("[Rate limited: waiting %d seconds as specified by API]" , retryAfterSec ),
295- }
296-
297- // Sleep respecting context cancellation
298- select {
299- case <- ctx .Done ():
300- eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
301- return
302- case <- time .After (time .Duration (retryMs ) * time .Millisecond ):
303- // Continue with retry after specified delay
304- continue
305- }
306- }
307- }
243+ var apierr * anthropic.Error
244+ if ! errors .As (err , & apierr ) {
245+ eventChan <- ProviderEvent {Type : EventError , Error : err }
246+ return
247+ }
308248
309- // Fall back to exponential backoff if Retry-After parsing failed
310- continue
249+ if apierr .StatusCode != 429 && apierr .StatusCode != 529 {
250+ eventChan <- ProviderEvent {Type : EventError , Error : err }
251+ return
252+ }
253+
254+ if attempts > maxRetries {
255+ eventChan <- ProviderEvent {
256+ Type : EventError ,
257+ Error : errors .New ("maximum retry attempts reached for rate limit (429)" ),
258+ }
259+ return
260+ }
261+
262+ retryMs := 0
263+ retryAfterValues := apierr .Response .Header .Values ("Retry-After" )
264+ if len (retryAfterValues ) > 0 {
265+ var retryAfterSec int
266+ if _ , err := fmt .Sscanf (retryAfterValues [0 ], "%d" , & retryAfterSec ); err == nil {
267+ retryMs = retryAfterSec * 1000
268+ eventChan <- ProviderEvent {
269+ Type : EventWarning ,
270+ Info : fmt .Sprintf ("[Rate limited: waiting %d seconds as specified by API]" , retryAfterSec ),
311271}
312272}
273+ } else {
274+ eventChan <- ProviderEvent {
275+ Type : EventWarning ,
276+ Info : fmt .Sprintf ("[Retrying due to rate limit... attempt %d of %d]" , attempts , maxRetries ),
277+ }
313278
314- // For non-rate limit errors, report and exit
315- eventChan <- ProviderEvent {Type : EventError , Error : err }
279+ backoffMs := 2000 * (1 << (attempts - 1 ))
280+ jitterMs := int (float64 (backoffMs ) * 0.2 )
281+ retryMs = backoffMs + jitterMs
282+ }
283+ select {
284+ case <- ctx .Done ():
285+ eventChan <- ProviderEvent {Type : EventError , Error : ctx .Err ()}
316286return
287+ case <- time .After (time .Duration (retryMs ) * time .Millisecond ):
288+ continue
317289}
290+
318291}
319292}()
320293
@@ -412,7 +385,6 @@ func (a *anthropicProvider) convertToAnthropicMessages(messages []message.Messag
412385blocks = append (blocks , anthropic .ContentBlockParamOfRequestToolUseBlock (toolCall .ID , inputMap , toolCall .Name ))
413386}
414387
415- // Skip empty assistant messages completely
416388if len (blocks ) > 0 {
417389anthropicMessages = append (anthropicMessages , anthropic .NewAssistantMessage (blocks ... ))
418390}
0 commit comments