Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ func init() {
})
}

// DefaultConfiguration contains the default configuration initalized from
// DefaultConfiguration contains the default configuration initialized from
// environment variables.
var DefaultConfiguration Configuration

// Configuration AMQP configuration settings, this settings are set using the
// envinroment varabiles.
// environment variables.
type Configuration struct {
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
Expand Down Expand Up @@ -325,7 +325,7 @@ func (q *Queue) Publish(j *queue.Job) (err error) {
}

// PublishDelayed publishes the given Job with a given delay. Delayed messages
// wont go into the buried queue if they fail.
// will not go into the buried queue if they fail.
func (q *Queue) PublishDelayed(j *queue.Job, delay time.Duration) error {
if j == nil || j.Size() == 0 {
return queue.ErrEmptyJob.New()
Expand Down Expand Up @@ -519,16 +519,14 @@ func (q *Queue) Transaction(txcb queue.TxCallback) error {
return ch.TxCommit()
}

// Implements Queue. The advertisedWindow value will be the exact
// number of undelivered jobs in transit, not just the minium.
// Consume implements the Queue interface. The advertisedWindow value
// is the maximum number of unacknowledged jobs
func (q *Queue) Consume(advertisedWindow int) (queue.JobIter, error) {
ch, err := q.conn.connection().Channel()
if err != nil {
return nil, ErrOpenChannel.New(err)
}

// enforce prefetching only one job, if this is removed the whole queue
// will be consumed.
if err := ch.Qos(advertisedWindow, 0, false); err != nil {
return nil, err
}
Expand Down Expand Up @@ -603,7 +601,7 @@ type Acknowledger struct {
id uint64
}

// Ack signals ackwoledgement.
// Ack signals acknowledgement.
func (a *Acknowledger) Ack() error {
return a.ack.Ack(a.id, false)
}
Expand Down
14 changes: 7 additions & 7 deletions common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (

// Broker represents a message broker.
type Broker interface {
// Queue returns a Queue from the with the given name.
// Queue returns a Queue from the Broker with the given name.
Queue(string) (Queue, error)
// Close closes the connection of the Broker.
Close() error
Expand All @@ -47,7 +47,7 @@ type RepublishConditionFunc func(job *Job) bool
// RepublishConditions alias of a list RepublishConditionFunc
type RepublishConditions []RepublishConditionFunc

// Comply checks if the Job fit in any of the defined conditions.
// Comply checks if the Job matches any of the defined conditions.
func (c RepublishConditions) Comply(job *Job) bool {
if len(c) == 0 {
return true
Expand All @@ -66,23 +66,23 @@ func (c RepublishConditions) Comply(job *Job) bool {
type Queue interface {
// Publish publishes the given Job to the queue.
Publish(*Job) error
// Publish publishes the given Job to the queue with a given delay.
// PublishDelayed publishes the given Job to the queue with a given delay.
PublishDelayed(*Job, time.Duration) error
// Transaction executes the passed TxCallback inside a transaction.
Transaction(TxCallback) error
// Consume returns a JobIter for the queue. It receives the minimum
// number of undelivered jobs the iterator will allow at any given
// Consume returns a JobIter for the queue. It receives the maximum
// number of unacknowledged jobs the iterator will allow at any given
// time (see the Acknowledger interface).
Consume(advertisedWindow int) (JobIter, error)
// RepublishBuried republish to the main queue those jobs complying
// RepublishBuried republishes to the main queue those jobs complying
// one of the conditions, leaving the rest of them in the buried queue.
RepublishBuried(conditions ...RepublishConditionFunc) error
}

// JobIter represents an iterator over a set of Jobs.
type JobIter interface {
// Next returns the next Job in the iterator. It should block until
// the job becomes available or while too many undelivered jobs has
// a new job becomes available or while too many undelivered jobs have
// been already returned (see the argument to Queue.Consume). Returns
// ErrAlreadyClosed if the iterator is closed.
Next() (*Job, error)
Expand Down
3 changes: 2 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Job struct {
Timestamp time.Time
// Retries is the number of times this job can be processed before being rejected.
Retries int32
// ErrorType is the kind of error that made the job failing.
// ErrorType is the kind of error that made the job fail.
ErrorType string
// ContentType of the job
ContentType string
Expand Down Expand Up @@ -81,6 +81,7 @@ func (j *Job) Decode(payload interface{}) error {
return decode(msgpackContentType, j.Raw, &payload)
}

// ErrCantAck is the error returned when the Job does not come from a queue
var ErrCantAck = errors.NewKind("can't acknowledge this message, it does not come from a queue")

// Ack is called when the job is finished.
Expand Down
4 changes: 2 additions & 2 deletions memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func init() {
})
}

// Broker is a in-memory implementation of brocker.
// Broker is a in-memory implementation of Broker.
type Broker struct {
queues map[string]queue.Queue
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func (q *Queue) PublishDelayed(j *queue.Job, delay time.Duration) error {
return nil
}

// RepublishBuried implement the Queue interface.
// RepublishBuried implements the Queue interface.
func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) error {
for _, job := range q.buriedJobs {
if queue.RepublishConditions(conditions).Comply(job) {
Expand Down
6 changes: 3 additions & 3 deletions register.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ var (
ErrUnsupportedProtocol = errors.NewKind("unsupported protocol: %s")
// ErrMalformedURI is the error returned when a Broker does not know
// how to parse a given URI.
ErrMalformedURI = errors.NewKind("malformed conection URI: %s")
ErrMalformedURI = errors.NewKind("malformed connection URI: %s")

register = make(map[string]BrokerBuilder, 0)
)

// BrokerBuilder function that instanciates a new brocked based on the given uri.
// BrokerBuilder instantiates a new Broker based on the given uri.
type BrokerBuilder func(uri string) (Broker, error)

// Register registers a new BrokerBuilder to be used by NewBroker, this function
Expand All @@ -30,7 +30,7 @@ func Register(name string, b BrokerBuilder) {
// NewBroker creates a new Broker based on the given URI. In order to register
// different implementations the package should be imported, example:
//
// import _ "gopkg.in/src-d/go-queue.v1/amqp"
// import _ "gopkg.in/src-d/go-queue.v1/amqp"
func NewBroker(uri string) (Broker, error) {
url, err := url.Parse(uri)
if err != nil {
Expand Down