Skip to content

Conversation

andrefurlan-db
Copy link
Contributor

@andrefurlan-db andrefurlan-db commented Nov 22, 2022

PoC on one approach of supporting true async apis.

Goal

The goal is to support server restart while a query is running.

It is not the goal to support parallelism and non-blocking calls. These can easily be supported with go routines with the standard sql API.

API

Function and type names are not final.
sql.OpenDB(connector) works the same. No change. This is the synchronous API.
dbsql.OpenDB(connector) is new. This is where the asynchronous APis live.

So the user has to explicitly want to interact with asynchronous APIs.

dbsql.OpenDB() returns `DatabricksDB interface.

type DatabricksDB interface { QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, Execution, error) CancelExecution(ctx context.Context, exc Execution) error GetExecutionRows(ctx context.Context, exc Execution) (*sql.Rows, error) CheckExecution(ctx context.Context, exc Execution) (Execution, error) // .... other relevant sql.DB APIs Conn(ctx context.Context) (*sql.Conn, error) Close() error Stats() sql.DBStats SetMaxOpenConns(n int) Driver() driver.Driver // ... }

The APIs support DirectResults, which means that calls are synchronous up to 5s, then become asynchronous. The sql.Rows and sql.Result APIs remain exactly the same.

Usage

A typical workflow would be something like:

rs, exc, err := db.QueryContext(ogCtx, `select * from dummy`) if err != nil { log.Fatal(err)	} defer rs.Close() // ... save exc in cache/db for continuing process in case server restarts if exc.Status == dbsql.ExecutionFinished { var res string for rs.Next() { err := rs.Scan(&res) // ... so something with the result 	}	} // ... poll until status is terminal for { if exc.Status.Terminal() { break	} else { exc, err = db.CheckExecution(ogCtx, exc) if err != nil { log.Fatal(err)	}	} // must poll at least every 10 minutes time.Sleep(time.Second)	} // ... get results if exc.Status == dbsql.ExecutionFinished { rs, err = db.GetExecutionRows(ogCtx, exc) if err != nil { log.Fatal(err)	} var res string for rs.Next() { err := rs.Scan(&res) // ... so something with the result 	}	} else { ... handle error cases	}
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
@andrefurlan-db andrefurlan-db changed the title Async [poc] Async support Nov 22, 2022
Signed-off-by: Andre Furlan <andre.furlan@databricks.com>
Comment on lines +144 to +148
excId, ok := ctx.Value(driverctx.ExecutionContextKey).(*Execution)
if !ok {
return nil
}
return excId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since it's the Execution pointer and not Id being returned, perhaps use exc instead of excId

@andrefurlan-db
Copy link
Contributor Author

one idea to improve would be rs, exc, err = db.CheckExecution(ogCtx, exc). This would make a single call and make the loop simpler

Copy link
Contributor

@aldld aldld left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a quick look over, mainly focused on the interface. Looking forward to this!

Authenticator string //TODO for oauth

RunAsync bool // TODO
RunAsync bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason why this is set at the config level, rather than on a per-query level? Or rather, is there any reason not to provide different methods for callers to use depending on whether they want the sync vs. async behaviour?

Copy link
Contributor Author

@andrefurlan-db andrefurlan-db Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. It was the fastest way I got the PoC to work :-) . I have to think about the methods in the interface, maybe it does make sense to do as you say

// i := v
// go func() {
// _, exc, err := db.QueryContext(ogCtx, fmt.Sprintf("select %s", i))
rs, exc, err := db.QueryContext(ogCtx, `SELECT id FROM RANGE(100) ORDER BY RANDOM() + 2 asc`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I'm understanding the semantics of this correctly:

  1. If the query has completed (direct results), then rs will be set and the caller can fetch the results
  2. Otherwise, the caller can use exc to get the operation ID(?), and use that to poll for query status?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @aldld , that's the idea... there is more to the execution than the ID, but it is just data that can be saved and used later.
I am not very happy with the way to check the results yet with different methods to query, check, get results. There must be a simpler way to merge check and get results in the same call. See the example I added so you have an idea of what I am talking about.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants