Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ import (
// chars found in table or column name.
var ErrInvalidMsg = errors.New("invalid message")

const (
defaultBufferCapacity = 128 * 1024
defaultFileNameLimit = 127
)

type tlsMode int64

const (
Expand All @@ -62,18 +67,19 @@ const (
// Each sender corresponds to a single TCP connection. A sender
// should not be called concurrently by multiple goroutines.
type LineSender struct {
address string
tlsMode tlsMode
keyId string // Erased once auth is done.
key string // Erased once auth is done.
bufCap int
conn net.Conn
buf *buffer
lastMsgPos int
lastErr error
hasTable bool
hasTags bool
hasFields bool
address string
tlsMode tlsMode
keyId string // Erased once auth is done.
key string // Erased once auth is done.
bufCap int
fileNameLimit int
conn net.Conn
buf *buffer
lastMsgPos int
lastErr error
hasTable bool
hasTags bool
hasFields bool
}

// LineSenderOption defines line sender option.
Expand Down Expand Up @@ -126,6 +132,18 @@ func WithBufferCapacity(capacity int) LineSenderOption {
}
}

// WithFileNameLimit sets maximum file name length in chars
// allowed by the server. Affects maximum table and column name
// lengths accepted by the sender. Should be set to the same value
// as on the server. Defaults to 127.
func WithFileNameLimit(limit int) LineSenderOption {
return func(s *LineSender) {
if limit > 0 {
s.fileNameLimit = limit
}
}
}

// NewLineSender creates new InfluxDB Line Protocol (ILP) sender. Each
// sender corresponds to a single TCP connection. Sender should
// not be called concurrently by multiple goroutines.
Expand All @@ -138,9 +156,10 @@ func NewLineSender(ctx context.Context, opts ...LineSenderOption) (*LineSender,
)

s := &LineSender{
address: "127.0.0.1:9009",
bufCap: 128 * 1024,
tlsMode: noTls,
address: "127.0.0.1:9009",
bufCap: defaultBufferCapacity,
fileNameLimit: defaultFileNameLimit,
tlsMode: noTls,
}
for _, opt := range opts {
opt(s)
Expand Down Expand Up @@ -375,6 +394,11 @@ func (s *LineSender) writeTableName(str string) error {
if str == "" {
return fmt.Errorf("table name cannot be empty: %w", ErrInvalidMsg)
}
// We use string length in bytes as an approximation. That's to
// avoid calculating the number of runes.
if len(str) > s.fileNameLimit {
return fmt.Errorf("table name length exceeds the limit: %w", ErrInvalidMsg)
}
// Since we're interested in ASCII chars, it's fine to iterate
// through bytes instead of runes.
for i := 0; i < len(str); i++ {
Expand Down Expand Up @@ -470,6 +494,11 @@ func (s *LineSender) writeColumnName(str string) error {
if str == "" {
return fmt.Errorf("column name cannot be empty: %w", ErrInvalidMsg)
}
// We use string length in bytes as an approximation. That's to
// avoid calculating the number of runes.
if len(str) > s.fileNameLimit {
return fmt.Errorf("column name length exceeds the limit: %w", ErrInvalidMsg)
}
// Since we're interested in ASCII chars, it's fine to iterate
// through bytes instead of runes.
for i := 0; i < len(str); i++ {
Expand Down
46 changes: 46 additions & 0 deletions sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,52 @@ func TestFloat64Serialization(t *testing.T) {
}
}

func TestErrorOnLengthyNames(t *testing.T) {
const nameLimit = 42

var (
lengthyStr = strings.Repeat("a", nameLimit+1)
ctx = context.Background()
)

testCases := []struct {
name string
writerFn writerFn
expectedErrMsg string
}{
{
"lengthy table name",
func(s *qdb.LineSender) error {
return s.Table(lengthyStr).StringColumn("str_col", "foo").AtNow(ctx)
},
"table name length exceeds the limit",
},
{
"lengthy column name",
func(s *qdb.LineSender) error {
return s.Table(testTable).StringColumn(lengthyStr, "foo").AtNow(ctx)
},
"column name length exceeds the limit",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
srv, err := newTestServer(readAndDiscard)
assert.NoError(t, err)

sender, err := qdb.NewLineSender(ctx, qdb.WithAddress(srv.addr), qdb.WithFileNameLimit(nameLimit))
assert.NoError(t, err)

err = tc.writerFn(sender)
assert.ErrorContains(t, err, tc.expectedErrMsg)

sender.Close()
srv.close()
})
}
}

func TestErrorOnMissingTableCall(t *testing.T) {
ctx := context.Background()

Expand Down