Skip to content

Commit 73e8594

Browse files
committed
feat: Add Arrow IPC stream iterator for direct access to Arrow binary format
## Summary This PR adds support for accessing raw Arrow IPC (Inter-Process Communication) streams directly, providing an alternative to the existing parsed Arrow record interface. This allows users to work with the Arrow binary format without the overhead of parsing, enabling use cases like streaming data to other systems or custom processing pipelines. ## Changes - **Refactored internal batch iterator** to expose IPC streams: - Renamed `BatchIterator` → `IPCStreamIterator` (returns `io.Reader`) - Updated implementations to return raw IPC streams instead of parsed batches - Created backward-compatible `BatchIterator` wrapper - **Added public API** in `rows` package: - New `GetArrowIPCStreams()` method on `Rows` interface - New `ArrowIPCStreamIterator` interface with methods: - `Next() (io.Reader, error)` - returns raw IPC stream - `HasNext() bool` - `Close()` - `SchemaBytes() ([]byte, error)` - **Updated all row scanner implementations** to support IPC streams - **Added example** demonstrating IPC stream usage ## Benefits - **Performance**: Avoid parsing overhead when forwarding Arrow data - **Flexibility**: Direct access to Arrow binary format for custom processing - **Compatibility**: Easier integration with other Arrow-based systems - **Memory efficiency**: Process streams without loading all records into memory ## Testing - All existing tests pass - Backward compatibility maintained through wrapper pattern - Example provided in `examples/ipcstreams/` ## Usage Example ```go rows, err := db.QueryContext(ctx, "SELECT * FROM table") ipcStreams, err := rows.(dbsqlrows.Rows).GetArrowIPCStreams(ctx) defer ipcStreams.Close() for ipcStreams.HasNext() { reader, err := ipcStreams.Next() // Process raw Arrow IPC stream } Signed-off-by: Jade Wang <jade.wang@databricks.com>
1 parent 746c05d commit 73e8594

File tree

10 files changed

+455
-65
lines changed

10 files changed

+455
-65
lines changed

examples/ipcstreams/main.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"database/sql/driver"
7+
"io"
8+
"log"
9+
"os"
10+
"strconv"
11+
"time"
12+
13+
"github.com/apache/arrow/go/v12/arrow/ipc"
14+
dbsql "github.com/databricks/databricks-sql-go"
15+
dbsqlrows "github.com/databricks/databricks-sql-go/rows"
16+
"github.com/joho/godotenv"
17+
)
18+
19+
func main() {
20+
// Load environment variables from .env file if it exists
21+
// This will not override existing environment variables
22+
_ = godotenv.Load()
23+
24+
port, err := strconv.Atoi(os.Getenv("DATABRICKS_PORT"))
25+
if err != nil {
26+
log.Fatal(err.Error())
27+
}
28+
29+
connector, err := dbsql.NewConnector(
30+
dbsql.WithServerHostname(os.Getenv("DATABRICKS_HOST")),
31+
dbsql.WithPort(port),
32+
dbsql.WithHTTPPath(os.Getenv("DATABRICKS_HTTPPATH")),
33+
dbsql.WithAccessToken(os.Getenv("DATABRICKS_ACCESSTOKEN")),
34+
)
35+
if err != nil {
36+
log.Fatal(err)
37+
}
38+
39+
db := sql.OpenDB(connector)
40+
defer db.Close()
41+
42+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
43+
defer cancel()
44+
45+
conn, _ := db.Conn(ctx)
46+
defer conn.Close()
47+
48+
query := `SELECT * FROM samples.nyctaxi.trips LIMIT 1000`
49+
50+
var rows driver.Rows
51+
err = conn.Raw(func(d interface{}) error {
52+
var err error
53+
rows, err = d.(driver.QueryerContext).QueryContext(ctx, query, nil)
54+
return err
55+
})
56+
57+
if err != nil {
58+
log.Fatal("Failed to execute query: ", err)
59+
}
60+
defer rows.Close()
61+
62+
// Get the IPC stream iterator
63+
ipcStreams, err := rows.(dbsqlrows.Rows).GetArrowIPCStreams(ctx)
64+
if err != nil {
65+
log.Fatal("Failed to get IPC streams: ", err)
66+
}
67+
defer ipcStreams.Close()
68+
69+
// Get the schema bytes
70+
schemaBytes, err := ipcStreams.SchemaBytes()
71+
if err != nil {
72+
log.Fatal("Failed to get schema bytes: ", err)
73+
}
74+
log.Printf("Schema bytes length: %d", len(schemaBytes))
75+
76+
// Process IPC streams
77+
streamCount := 0
78+
recordCount := 0
79+
80+
for ipcStreams.HasNext() {
81+
// Get the next IPC stream
82+
reader, err := ipcStreams.Next()
83+
if err != nil {
84+
if err == io.EOF {
85+
break
86+
}
87+
log.Fatal("Failed to get next IPC stream: ", err)
88+
}
89+
90+
streamCount++
91+
92+
// Create an IPC reader for this stream
93+
ipcReader, err := ipc.NewReader(reader)
94+
if err != nil {
95+
log.Fatal("Failed to create IPC reader: ", err)
96+
}
97+
98+
// Process records in the stream
99+
for ipcReader.Next() {
100+
record := ipcReader.Record()
101+
recordCount++
102+
log.Printf("Stream %d, Record %d: %d rows, %d columns",
103+
streamCount, recordCount, record.NumRows(), record.NumCols())
104+
105+
// Don't forget to release the record when done
106+
record.Release()
107+
}
108+
109+
if err := ipcReader.Err(); err != nil {
110+
log.Fatal("IPC reader error: ", err)
111+
}
112+
113+
ipcReader.Release()
114+
}
115+
116+
log.Printf("Processed %d IPC streams with %d total records", streamCount, recordCount)
117+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package arrowbased
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/databricks/databricks-sql-go/internal/cli_service"
8+
"github.com/databricks/databricks-sql-go/internal/config"
9+
dbsqlerrint "github.com/databricks/databricks-sql-go/internal/errors"
10+
"github.com/databricks/databricks-sql-go/internal/rows/rowscanner"
11+
"github.com/databricks/databricks-sql-go/rows"
12+
)
13+
14+
// NewArrowIPCStreamIterator creates a new iterator for Arrow IPC streams
15+
func NewArrowIPCStreamIterator(ctx context.Context, rpi rowscanner.ResultPageIterator, ipcIterator IPCStreamIterator, arrowSchemaBytes []byte, cfg config.Config) rows.ArrowIPCStreamIterator {
16+
return &arrowIPCStreamIterator{
17+
cfg: cfg,
18+
ipcStreamIterator: ipcIterator,
19+
resultPageIterator: rpi,
20+
ctx: ctx,
21+
arrowSchemaBytes: arrowSchemaBytes,
22+
}
23+
}
24+
25+
// arrowIPCStreamIterator implements rows.ArrowIPCStreamIterator
26+
type arrowIPCStreamIterator struct {
27+
ctx context.Context
28+
cfg config.Config
29+
ipcStreamIterator IPCStreamIterator
30+
resultPageIterator rowscanner.ResultPageIterator
31+
isFinished bool
32+
arrowSchemaBytes []byte
33+
}
34+
35+
var _ rows.ArrowIPCStreamIterator = (*arrowIPCStreamIterator)(nil)
36+
37+
// Next retrieves the next Arrow IPC stream
38+
func (ri *arrowIPCStreamIterator) Next() (io.Reader, error) {
39+
if !ri.HasNext() {
40+
return nil, io.EOF
41+
}
42+
43+
if ri.ipcStreamIterator != nil && ri.ipcStreamIterator.HasNext() {
44+
return ri.ipcStreamIterator.Next()
45+
}
46+
47+
// If there is no iterator, or we have exhausted the current iterator, try to load more data
48+
if err := ri.fetchNextData(); err != nil {
49+
return nil, err
50+
}
51+
52+
// Try again after fetching new data
53+
if ri.ipcStreamIterator != nil && ri.ipcStreamIterator.HasNext() {
54+
return ri.ipcStreamIterator.Next()
55+
}
56+
57+
return nil, io.EOF
58+
}
59+
60+
// HasNext returns true if there are more streams available
61+
func (ri *arrowIPCStreamIterator) HasNext() bool {
62+
if ri.isFinished {
63+
return false
64+
}
65+
66+
if ri.ipcStreamIterator != nil && ri.ipcStreamIterator.HasNext() {
67+
return true
68+
}
69+
70+
if ri.resultPageIterator == nil || !ri.resultPageIterator.HasNext() {
71+
return false
72+
}
73+
74+
return true
75+
}
76+
77+
// Close releases resources
78+
func (ri *arrowIPCStreamIterator) Close() {
79+
if ri.ipcStreamIterator != nil {
80+
ri.ipcStreamIterator.Close()
81+
ri.ipcStreamIterator = nil
82+
}
83+
ri.isFinished = true
84+
}
85+
86+
// SchemaBytes returns the Arrow schema bytes
87+
func (ri *arrowIPCStreamIterator) SchemaBytes() ([]byte, error) {
88+
return ri.arrowSchemaBytes, nil
89+
}
90+
91+
// fetchNextData loads the next page of data
92+
func (ri *arrowIPCStreamIterator) fetchNextData() error {
93+
if ri.isFinished {
94+
return io.EOF
95+
}
96+
97+
// First close any existing iterator
98+
if ri.ipcStreamIterator != nil {
99+
ri.ipcStreamIterator.Close()
100+
ri.ipcStreamIterator = nil
101+
}
102+
103+
if ri.resultPageIterator == nil || !ri.resultPageIterator.HasNext() {
104+
ri.isFinished = true
105+
return io.EOF
106+
}
107+
108+
// Get the next page of the result set
109+
resp, err := ri.resultPageIterator.Next()
110+
if err != nil {
111+
ri.isFinished = true
112+
return err
113+
}
114+
115+
// Check the result format
116+
resultFormat := resp.ResultSetMetadata.GetResultFormat()
117+
if resultFormat != cli_service.TSparkRowSetType_ARROW_BASED_SET && resultFormat != cli_service.TSparkRowSetType_URL_BASED_SET {
118+
return dbsqlerrint.NewDriverError(ri.ctx, errArrowRowsNotArrowFormat, nil)
119+
}
120+
121+
// Update schema if this is the first fetch
122+
if ri.arrowSchemaBytes == nil && resp.ResultSetMetadata != nil && resp.ResultSetMetadata.ArrowSchema != nil {
123+
ri.arrowSchemaBytes = resp.ResultSetMetadata.ArrowSchema
124+
}
125+
126+
// Create new iterator from the fetched data
127+
bi, err := ri.newIPCStreamIterator(resp)
128+
if err != nil {
129+
ri.isFinished = true
130+
return err
131+
}
132+
133+
ri.ipcStreamIterator = bi
134+
return nil
135+
}
136+
137+
// Create a new IPC stream iterator from a page of the result set
138+
func (ri *arrowIPCStreamIterator) newIPCStreamIterator(fr *cli_service.TFetchResultsResp) (IPCStreamIterator, error) {
139+
rowSet := fr.Results
140+
if len(rowSet.ResultLinks) > 0 {
141+
return NewCloudIPCStreamIterator(ri.ctx, rowSet.ResultLinks, rowSet.StartRowOffset, &ri.cfg)
142+
} else {
143+
return NewLocalIPCStreamIterator(ri.ctx, rowSet.ArrowBatches, rowSet.StartRowOffset, ri.arrowSchemaBytes, &ri.cfg)
144+
}
145+
}

internal/rows/arrowbased/arrowRows.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,20 @@ func (ars *arrowRowScanner) GetArrowBatches(ctx context.Context, cfg config.Conf
329329
return ri, nil
330330
}
331331

332+
func (ars *arrowRowScanner) GetArrowIPCStreams(ctx context.Context, cfg config.Config, rpi rowscanner.ResultPageIterator) (dbsqlrows.ArrowIPCStreamIterator, error) {
333+
// Get the underlying IPC stream iterator from the batch iterator
334+
var ipcIterator IPCStreamIterator
335+
if ars.batchIterator != nil {
336+
// If we have a batch iterator, extract its IPC stream iterator
337+
if wrapper, ok := ars.batchIterator.(*batchIterator); ok {
338+
ipcIterator = wrapper.ipcIterator
339+
}
340+
}
341+
342+
ri := NewArrowIPCStreamIterator(ctx, rpi, ipcIterator, ars.arrowSchemaBytes, cfg)
343+
return ri, nil
344+
}
345+
332346
// getArrowSchemaBytes returns the serialized schema in ipc format
333347
func getArrowSchemaBytes(schema *arrow.Schema, ctx context.Context) ([]byte, dbsqlerr.DBError) {
334348
if schema == nil {

internal/rows/arrowbased/arrowRows_test.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,10 +1041,8 @@ func TestArrowRowScanner(t *testing.T) {
10411041
ars := d.(*arrowRowScanner)
10421042
assert.Equal(t, int64(53940), ars.NRows())
10431043

1044-
bi, ok := ars.batchIterator.(*localBatchIterator)
1045-
assert.True(t, ok)
1046-
fbi := &batchIteratorWrapper{
1047-
bi: bi,
1044+
fbi := &testBatchIteratorWrapper{
1045+
bi: ars.batchIterator,
10481046
}
10491047

10501048
ars.batchIterator = fbi
@@ -1674,26 +1672,26 @@ func (fbi *fakeBatchIterator) Close() {
16741672
fbi.lastReadBatch = nil
16751673
}
16761674

1677-
type batchIteratorWrapper struct {
1675+
type testBatchIteratorWrapper struct {
16781676
bi BatchIterator
16791677
callCount int
16801678
lastLoadedBatch SparkArrowBatch
16811679
}
16821680

1683-
var _ BatchIterator = (*batchIteratorWrapper)(nil)
1681+
var _ BatchIterator = (*testBatchIteratorWrapper)(nil)
16841682

1685-
func (biw *batchIteratorWrapper) Next() (SparkArrowBatch, error) {
1683+
func (biw *testBatchIteratorWrapper) Next() (SparkArrowBatch, error) {
16861684
biw.callCount += 1
16871685
batch, err := biw.bi.Next()
16881686
biw.lastLoadedBatch = batch
16891687
return batch, err
16901688
}
16911689

1692-
func (biw *batchIteratorWrapper) HasNext() bool {
1690+
func (biw *testBatchIteratorWrapper) HasNext() bool {
16931691
return biw.bi.HasNext()
16941692
}
16951693

1696-
func (biw *batchIteratorWrapper) Close() {
1694+
func (biw *testBatchIteratorWrapper) Close() {
16971695
biw.bi.Close()
16981696
}
16991697

0 commit comments

Comments
 (0)