Skip to content

Commit 631401e

Browse files
authored
add log for cloud fetch speed (#281)
## Description Add logs for cloud fetch download speed relevant ticket: https://databricks.atlassian.net/browse/XTA-11037 ## Testing verified logs are printed when log level is set to INFO
1 parent 529d69c commit 631401e

File tree

4 files changed

+81
-42
lines changed

4 files changed

+81
-42
lines changed

connector_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ func TestNewConnector(t *testing.T) {
4343
WithSkipTLSHostVerify(),
4444
)
4545
expectedCloudFetchConfig := config.CloudFetchConfig{
46-
UseCloudFetch: true,
47-
MaxDownloadThreads: 15,
48-
MaxFilesInMemory: 10,
49-
MinTimeToExpiry: 0 * time.Second,
46+
UseCloudFetch: true,
47+
MaxDownloadThreads: 15,
48+
MaxFilesInMemory: 10,
49+
MinTimeToExpiry: 0 * time.Second,
50+
CloudFetchSpeedThresholdMbps: 0.1,
5051
}
5152
expectedUserConfig := config.UserConfig{
5253
Host: host,
@@ -89,10 +90,11 @@ func TestNewConnector(t *testing.T) {
8990
WithHTTPPath(httpPath),
9091
)
9192
expectedCloudFetchConfig := config.CloudFetchConfig{
92-
UseCloudFetch: true,
93-
MaxDownloadThreads: 10,
94-
MaxFilesInMemory: 10,
95-
MinTimeToExpiry: 0 * time.Second,
93+
UseCloudFetch: true,
94+
MaxDownloadThreads: 10,
95+
MaxFilesInMemory: 10,
96+
MinTimeToExpiry: 0 * time.Second,
97+
CloudFetchSpeedThresholdMbps: 0.1,
9698
}
9799
expectedUserConfig := config.UserConfig{
98100
Host: host,
@@ -130,10 +132,11 @@ func TestNewConnector(t *testing.T) {
130132
WithRetries(-1, 0, 0),
131133
)
132134
expectedCloudFetchConfig := config.CloudFetchConfig{
133-
UseCloudFetch: true,
134-
MaxDownloadThreads: 10,
135-
MaxFilesInMemory: 10,
136-
MinTimeToExpiry: 0 * time.Second,
135+
UseCloudFetch: true,
136+
MaxDownloadThreads: 10,
137+
MaxFilesInMemory: 10,
138+
MinTimeToExpiry: 0 * time.Second,
139+
CloudFetchSpeedThresholdMbps: 0.1,
137140
}
138141
expectedUserConfig := config.UserConfig{
139142
Host: host,

internal/config/config.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -464,10 +464,11 @@ func (arrowConfig ArrowConfig) DeepCopy() ArrowConfig {
464464
}
465465

466466
type CloudFetchConfig struct {
467-
UseCloudFetch bool
468-
MaxDownloadThreads int
469-
MaxFilesInMemory int
470-
MinTimeToExpiry time.Duration
467+
UseCloudFetch bool
468+
MaxDownloadThreads int
469+
MaxFilesInMemory int
470+
MinTimeToExpiry time.Duration
471+
CloudFetchSpeedThresholdMbps float64 // Minimum download speed in MBps before WARN logging (default: 0.1)
471472
}
472473

473474
func (cfg CloudFetchConfig) WithDefaults() CloudFetchConfig {
@@ -485,14 +486,19 @@ func (cfg CloudFetchConfig) WithDefaults() CloudFetchConfig {
485486
cfg.MinTimeToExpiry = 0 * time.Second
486487
}
487488

489+
if cfg.CloudFetchSpeedThresholdMbps <= 0 {
490+
cfg.CloudFetchSpeedThresholdMbps = 0.1
491+
}
492+
488493
return cfg
489494
}
490495

491496
func (cfg CloudFetchConfig) DeepCopy() CloudFetchConfig {
492497
return CloudFetchConfig{
493-
UseCloudFetch: cfg.UseCloudFetch,
494-
MaxDownloadThreads: cfg.MaxDownloadThreads,
495-
MaxFilesInMemory: cfg.MaxFilesInMemory,
496-
MinTimeToExpiry: cfg.MinTimeToExpiry,
498+
UseCloudFetch: cfg.UseCloudFetch,
499+
MaxDownloadThreads: cfg.MaxDownloadThreads,
500+
MaxFilesInMemory: cfg.MaxFilesInMemory,
501+
MinTimeToExpiry: cfg.MinTimeToExpiry,
502+
CloudFetchSpeedThresholdMbps: cfg.CloudFetchSpeedThresholdMbps,
497503
}
498504
}

internal/config/config_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,10 @@ func TestParseConfig(t *testing.T) {
247247
RetryWaitMin: 1 * time.Second,
248248
RetryWaitMax: 30 * time.Second,
249249
CloudFetchConfig: CloudFetchConfig{
250-
UseCloudFetch: true,
251-
MaxDownloadThreads: 10,
252-
MaxFilesInMemory: 10,
250+
UseCloudFetch: true,
251+
MaxDownloadThreads: 10,
252+
MaxFilesInMemory: 10,
253+
CloudFetchSpeedThresholdMbps: 0.1,
253254
},
254255
},
255256
wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b",
@@ -271,9 +272,10 @@ func TestParseConfig(t *testing.T) {
271272
RetryWaitMin: 1 * time.Second,
272273
RetryWaitMax: 30 * time.Second,
273274
CloudFetchConfig: CloudFetchConfig{
274-
UseCloudFetch: true,
275-
MaxDownloadThreads: 15,
276-
MaxFilesInMemory: 10,
275+
UseCloudFetch: true,
276+
MaxDownloadThreads: 15,
277+
MaxFilesInMemory: 10,
278+
CloudFetchSpeedThresholdMbps: 0.1,
277279
},
278280
},
279281
wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123b",
@@ -299,9 +301,10 @@ func TestParseConfig(t *testing.T) {
299301
RetryWaitMin: 1 * time.Second,
300302
RetryWaitMax: 30 * time.Second,
301303
CloudFetchConfig: CloudFetchConfig{
302-
UseCloudFetch: true,
303-
MaxDownloadThreads: 15,
304-
MaxFilesInMemory: 10,
304+
UseCloudFetch: true,
305+
MaxDownloadThreads: 15,
306+
MaxFilesInMemory: 10,
307+
CloudFetchSpeedThresholdMbps: 0.1,
305308
},
306309
},
307310
wantURL: "https://example.cloud.databricks.com:8000/sql/1.0/endpoints/12346a5b5b0e123a",

internal/rows/arrowbased/batchloader.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"strings"
89
"time"
910

1011
"github.com/databricks/databricks-sql-go/internal/config"
@@ -154,12 +155,13 @@ func (bi *cloudIPCStreamIterator) Next() (io.Reader, error) {
154155

155156
cancelCtx, cancelFn := context.WithCancel(bi.ctx)
156157
task := &cloudFetchDownloadTask{
157-
ctx: cancelCtx,
158-
cancel: cancelFn,
159-
useLz4Compression: bi.cfg.UseLz4Compression,
160-
link: link,
161-
resultChan: make(chan cloudFetchDownloadTaskResult),
162-
minTimeToExpiry: bi.cfg.MinTimeToExpiry,
158+
ctx: cancelCtx,
159+
cancel: cancelFn,
160+
useLz4Compression: bi.cfg.UseLz4Compression,
161+
link: link,
162+
resultChan: make(chan cloudFetchDownloadTaskResult),
163+
minTimeToExpiry: bi.cfg.MinTimeToExpiry,
164+
speedThresholdMbps: bi.cfg.CloudFetchSpeedThresholdMbps,
163165
}
164166
task.Run()
165167
bi.downloadTasks.Enqueue(task)
@@ -201,12 +203,13 @@ type cloudFetchDownloadTaskResult struct {
201203
}
202204

203205
type cloudFetchDownloadTask struct {
204-
ctx context.Context
205-
cancel context.CancelFunc
206-
useLz4Compression bool
207-
minTimeToExpiry time.Duration
208-
link *cli_service.TSparkArrowResultLink
209-
resultChan chan cloudFetchDownloadTaskResult
206+
ctx context.Context
207+
cancel context.CancelFunc
208+
useLz4Compression bool
209+
minTimeToExpiry time.Duration
210+
link *cli_service.TSparkArrowResultLink
211+
resultChan chan cloudFetchDownloadTaskResult
212+
speedThresholdMbps float64
210213
}
211214

212215
func (cft *cloudFetchDownloadTask) GetResult() (io.Reader, error) {
@@ -249,7 +252,7 @@ func (cft *cloudFetchDownloadTask) Run() {
249252
cft.link.StartRowOffset,
250253
cft.link.RowCount,
251254
)
252-
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry)
255+
data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps)
253256
if err != nil {
254257
cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err}
255258
return
@@ -273,10 +276,30 @@ func (cft *cloudFetchDownloadTask) Run() {
273276
}()
274277
}
275278

279+
// logCloudFetchSpeed calculates and logs download speed metrics
280+
func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Duration, speedThresholdMbps float64) {
281+
if contentLength > 0 && duration.Seconds() > 0 {
282+
// Extract base URL (up to first ?)
283+
baseURL := fullURL
284+
if idx := strings.Index(baseURL, "?"); idx != -1 {
285+
baseURL = baseURL[:idx]
286+
}
287+
288+
speedMbps := float64(contentLength) / (1024 * 1024) / duration.Seconds()
289+
290+
logger.Info().Msgf("CloudFetch: Result File Download speed from cloud storage %s %.4f Mbps", baseURL, speedMbps)
291+
292+
if speedMbps < speedThresholdMbps {
293+
logger.Warn().Msgf("CloudFetch: Results download is slower than threshold speed of %.4f Mbps", speedThresholdMbps)
294+
}
295+
}
296+
}
297+
276298
func fetchBatchBytes(
277299
ctx context.Context,
278300
link *cli_service.TSparkArrowResultLink,
279301
minTimeToExpiry time.Duration,
302+
speedThresholdMbps float64,
280303
) (io.ReadCloser, error) {
281304
if isLinkExpired(link.ExpiryTime, minTimeToExpiry) {
282305
return nil, errors.New(dbsqlerr.ErrLinkExpired)
@@ -294,6 +317,7 @@ func fetchBatchBytes(
294317
}
295318
}
296319

320+
startTime := time.Now()
297321
client := http.DefaultClient
298322
res, err := client.Do(req)
299323
if err != nil {
@@ -304,6 +328,9 @@ func fetchBatchBytes(
304328
return nil, dbsqlerrint.NewDriverError(ctx, msg, err)
305329
}
306330

331+
// Log download speed metrics
332+
logCloudFetchSpeed(link.FileLink, res.ContentLength, time.Since(startTime), speedThresholdMbps)
333+
307334
return res.Body, nil
308335
}
309336

0 commit comments

Comments
 (0)