11package fluent
22
33import (
4+ "context"
45"encoding/json"
56"errors"
67"fmt"
@@ -85,15 +86,24 @@ type msgToSend struct {
8586type Fluent struct {
8687Config
8788
88- dialer dialer
89+ dialer dialer
90+ // stopRunning is used in async mode to signal to run() and connectOrRetryAsync()
91+ // they should abort.
8992stopRunning chan bool
90- pending chan * msgToSend
91- wg sync.WaitGroup
93+ // stopAsyncConnect is used by connectOrRetryAsync() to signal to
94+ // connectOrRetry() it should abort.
95+ stopAsyncConnect chan bool
96+ pending chan * msgToSend
97+ wg sync.WaitGroup
9298
9399muconn sync.Mutex
94100conn net.Conn
95101}
96102
103+ type dialer interface {
104+ DialContext (ctx context.Context , network , address string ) (net.Conn , error )
105+ }
106+
97107// New creates a new Logger.
98108func New (config Config ) (* Fluent , error ) {
99109if config .Timeout == 0 {
@@ -104,10 +114,6 @@ func New(config Config) (*Fluent, error) {
104114})
105115}
106116
107- type dialer interface {
108- Dial (string , string ) (net.Conn , error )
109- }
110-
111117func newWithDialer (config Config , d dialer ) (f * Fluent , err error ) {
112118if config .FluentNetwork == "" {
113119config .FluentNetwork = defaultNetwork
@@ -143,9 +149,11 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
143149
144150if config .Async {
145151f = & Fluent {
146- Config : config ,
147- dialer : d ,
148- pending : make (chan * msgToSend , config .BufferLimit ),
152+ Config : config ,
153+ dialer : d ,
154+ stopRunning : make (chan bool ),
155+ stopAsyncConnect : make (chan bool ),
156+ pending : make (chan * msgToSend , config .BufferLimit ),
149157}
150158f .wg .Add (1 )
151159go f .run ()
@@ -154,7 +162,7 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
154162Config : config ,
155163dialer : d ,
156164}
157- err = f .connect ()
165+ err = f .connect (context . Background () )
158166}
159167return
160168}
@@ -331,7 +339,11 @@ func (f *Fluent) Close() (err error) {
331339close (f .pending )
332340f .wg .Wait ()
333341}
334- f .close (f .conn )
342+
343+ f .muconn .Lock ()
344+ f .close ()
345+ f .muconn .Unlock ()
346+
335347return
336348}
337349
@@ -346,35 +358,105 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
346358}
347359
348360// close closes the connection.
349- func (f * Fluent ) close (c net.Conn ) {
350- f .muconn .Lock ()
351- if f .conn != nil && f .conn == c {
361+ func (f * Fluent ) close () {
362+ if f .conn != nil {
352363f .conn .Close ()
353364f .conn = nil
354365}
355- f .muconn .Unlock ()
356366}
357367
358368// connect establishes a new connection using the specified transport.
359- func (f * Fluent ) connect () (err error ) {
369+ func (f * Fluent ) connect (ctx context. Context ) (err error ) {
360370switch f .Config .FluentNetwork {
361371case "tcp" :
362- f .conn , err = f .dialer .Dial (
372+ f .conn , err = f .dialer .DialContext ( ctx ,
363373f .Config .FluentNetwork ,
364374f .Config .FluentHost + ":" + strconv .Itoa (f .Config .FluentPort ))
365375case "unix" :
366- f .conn , err = f .dialer .Dial (
376+ f .conn , err = f .dialer .DialContext ( ctx ,
367377f .Config .FluentNetwork ,
368378f .Config .FluentSocketPath )
369379default :
370380err = NewErrUnknownNetwork (f .Config .FluentNetwork )
371381}
382+
372383return err
373384}
374385
386+ var errIsClosing = errors .New ("fluent logger is closing, aborting async connect" )
387+
388+ func (f * Fluent ) connectOrRetry (ctx context.Context ) error {
389+ // Use a Time channel instead of time.Sleep() to avoid blocking this
390+ // goroutine during eventually way too much time (because of the exponential
391+ // back-off retry).
392+ waiter := time .After (time .Duration (0 ))
393+ for i := 0 ; i < f .Config .MaxRetry ; i ++ {
394+ select {
395+ case <- waiter :
396+ err := f .connect (ctx )
397+ if err == nil {
398+ return nil
399+ }
400+
401+ if _ , ok := err .(* ErrUnknownNetwork ); ok {
402+ // No need to retry on unknown network error. Thus false is passed
403+ // to ready channel to let the other end drain the message queue.
404+ return err
405+ }
406+
407+ waitTime := f .Config .RetryWait * e (defaultReconnectWaitIncreRate , float64 (i - 1 ))
408+ if waitTime > f .Config .MaxRetryWait {
409+ waitTime = f .Config .MaxRetryWait
410+ }
411+
412+ waiter = time .After (time .Duration (waitTime ) * time .Millisecond )
413+ case <- f .stopAsyncConnect :
414+ return errIsClosing
415+ }
416+ }
417+
418+ return fmt .Errorf ("could not connect to fluentd after %d retries" , f .Config .MaxRetry )
419+ }
420+
421+ // connectOrRetryAsync returns an error when it fails to connect to fluentd or
422+ // when Close() has been called.
423+ func (f * Fluent ) connectOrRetryAsync (ctx context.Context ) error {
424+ ctx , cancelDialing := context .WithCancel (ctx )
425+ errCh := make (chan error )
426+
427+ f .wg .Add (1 )
428+ go func (ctx context.Context , errCh chan <- error ) {
429+ defer f .wg .Done ()
430+ errCh <- f .connectOrRetry (ctx )
431+ }(ctx , errCh )
432+
433+ for {
434+ select {
435+ case _ , ok := <- f .stopRunning :
436+ // If f.stopRunning is closed before we got something on errCh,
437+ // we need to wait a bit more.
438+ if ! ok {
439+ break
440+ }
441+
442+ // Stop any connection dialing and then tell connectOrRetry to stop
443+ // trying to dial the connection. This has to be done in this
444+ // specifc order to make sure connectOrRetry() is not blocking on
445+ // the connection dialing.
446+ cancelDialing ()
447+ f .stopAsyncConnect <- true
448+ case err := <- errCh :
449+ return err
450+ }
451+ }
452+ }
453+
454+ // run is the goroutine used to unqueue and write logs in async mode. That
455+ // goroutine is meant to run during the whole life of the Fluent logger.
375456func (f * Fluent ) run () {
376457drainEvents := false
377458var emitEventDrainMsg sync.Once
459+
378460for {
379461select {
380462case entry , ok := <- f .pending :
@@ -387,16 +469,16 @@ func (f *Fluent) run() {
387469continue
388470}
389471err := f .write (entry )
390- if err != nil {
472+ if err == errIsClosing {
473+ drainEvents = true
474+ } else if err != nil {
475+ // TODO: log failing message?
391476fmt .Fprintf (os .Stderr , "[%s] Unable to send logs to fluentd, reconnecting...\n " , time .Now ().Format (time .RFC3339 ))
392477}
393- }
394- select {
395478case stopRunning , ok := <- f .stopRunning :
396479if stopRunning || ! ok {
397480drainEvents = true
398481}
399- default :
400482}
401483}
402484}
@@ -406,62 +488,64 @@ func e(x, y float64) int {
406488}
407489
408490func (f * Fluent ) write (msg * msgToSend ) error {
409- var c net.Conn
410- for i := 0 ; i < f .Config .MaxRetry ; i ++ {
411- c = f .conn
412- // Connect if needed
413- if c == nil {
414- f .muconn .Lock ()
415- if f .conn == nil {
416- err := f .connect ()
417- if err != nil {
418- f .muconn .Unlock ()
419-
420- if _ , ok := err .(* ErrUnknownNetwork ); ok {
421- // do not retry on unknown network error
422- break
423- }
424- waitTime := f .Config .RetryWait * e (defaultReconnectWaitIncreRate , float64 (i - 1 ))
425- if waitTime > f .Config .MaxRetryWait {
426- waitTime = f .Config .MaxRetryWait
427- }
428- time .Sleep (time .Duration (waitTime ) * time .Millisecond )
429- continue
430- }
491+ // This function is used to ensure muconn is properly locked and unlocked
492+ // between each retry. This gives the importunity to other goroutines to
493+ // lock it (e.g. to close the connection).
494+ writer := func () (bool , error ) {
495+ f .muconn .Lock ()
496+ defer f .muconn .Unlock ()
497+
498+ if f .conn == nil {
499+ var err error
500+ if f .Config .Async {
501+ err = f .connectOrRetryAsync (context .Background ())
502+ } else {
503+ err = f .connectOrRetry (context .Background ())
504+ }
505+
506+ if err != nil {
507+ return false , err
431508}
432- c = f .conn
433- f .muconn .Unlock ()
434509}
435510
436- // We're connected, write msg
437511t := f .Config .WriteTimeout
438512if time .Duration (0 ) < t {
439- c .SetWriteDeadline (time .Now ().Add (t ))
513+ f . conn .SetWriteDeadline (time .Now ().Add (t ))
440514} else {
441- c .SetWriteDeadline (time.Time {})
515+ f . conn .SetWriteDeadline (time.Time {})
442516}
443- _ , err := c .Write (msg .data )
517+
518+ _ , err := f .conn .Write (msg .data )
444519if err != nil {
445- f .close (c )
446- } else {
447- // Acknowledgment check
448- if msg .ack != "" {
449- resp := & AckResp {}
450- if f .Config .MarshalAsJSON {
451- dec := json .NewDecoder (c )
452- err = dec .Decode (resp )
453- } else {
454- r := msgp .NewReader (c )
455- err = resp .DecodeMsg (r )
456- }
457- if err != nil || resp .Ack != msg .ack {
458- f .close (c )
459- continue
460- }
520+ f .close ()
521+ return true , err
522+ }
523+
524+ // Acknowledgment check
525+ if msg .ack != "" {
526+ resp := & AckResp {}
527+ if f .Config .MarshalAsJSON {
528+ dec := json .NewDecoder (f .conn )
529+ err = dec .Decode (resp )
530+ } else {
531+ r := msgp .NewReader (f .conn )
532+ err = resp .DecodeMsg (r )
461533}
534+
535+ if err != nil || resp .Ack != msg .ack {
536+ f .close ()
537+ return true , err
538+ }
539+ }
540+
541+ return false , nil
542+ }
543+
544+ for i := 0 ; i < f .Config .MaxRetry ; i ++ {
545+ if retry , err := writer (); ! retry {
462546return err
463547}
464548}
465549
466- return fmt .Errorf ("fluent#write: failed to reconnect, max retry: %v " , f .Config .MaxRetry )
550+ return fmt .Errorf ("fluent#write: failed to write after %d attempts " , f .Config .MaxRetry )
467551}
0 commit comments