Skip to content

Commit 93c3e1c

Browse files
[SDP] root storage location of pipeline metadata
1 parent 795225d commit 93c3e1c

File tree

9 files changed

+54
-7
lines changed

9 files changed

+54
-7
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# FlowSystemMetadata
2+
3+
## latestCheckpointLocation { #latestCheckpointLocation }
4+
5+
```scala
6+
latestCheckpointLocation: String
7+
```
8+
9+
`latestCheckpointLocation`...FIXME
10+
11+
---
12+
13+
`latestCheckpointLocation` is used when:
14+
15+
* `FlowPlanner` is requested to [plan a StreamingFlow](FlowPlanner.md#plan)
16+
* `State` is requested to [reset a flow](State.md#reset)

docs/declarative-pipelines/PipelineUpdateContext.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
`PipelineUpdateContext` is an [abstraction](#contract) of [pipeline update contexts](#implementations) that can [refreshTables](#refreshTables) (_among other things_).
44

5-
## Contract (Subset) { #contract }
5+
## Contract (Subset)
66

7-
### refreshTables { #refreshTables }
7+
### refreshTables Table Filter { #refreshTables }
88

99
```scala
1010
refreshTables: TableFilter
@@ -15,6 +15,18 @@ Used when:
1515
* `DatasetManager` is requested to [constructFullRefreshSet](DatasetManager.md#constructFullRefreshSet)
1616
* `PipelineUpdateContext` is requested to [refreshFlows](PipelineUpdateContext.md#refreshFlows)
1717

18+
### Root Storage Location { #storageRoot }
19+
20+
```scala
21+
storageRoot: String
22+
```
23+
24+
The root storage location of pipeline metadata (e.g., checkpoints for streaming flows)
25+
26+
Used when:
27+
28+
* `FlowSystemMetadata` is requested to [flowCheckpointsDirOpt](FlowSystemMetadata.md#flowCheckpointsDirOpt)
29+
1830
### Unresolved Dataflow Graph { #unresolvedGraph }
1931

2032
```scala

docs/declarative-pipelines/PipelineUpdateContextImpl.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* <span id="eventCallback"> `PipelineEvent` Callback (`PipelineEvent => Unit`)
1111
* <span id="refreshTables"> `TableFilter` of the tables to be refreshed (default: `AllTables`)
1212
* <span id="fullRefreshTables"> `TableFilter` of the tables to be refreshed (default: `NoTables`)
13+
* <span id="storageRoot"> [storageRoot](PipelineUpdateContext.md#storageRoot)
1314

1415
`PipelineUpdateContextImpl` is created when:
1516

docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ handlePipelinesCommand(
2828
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) ||
2929
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_dataset) |
3030
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_flow) |
31-
| `START_RUN` | [Starts a pipeline run](#START_RUN) | [pyspark.pipelines.spark_connect_pipeline.start_run](spark_connect_pipeline.md#start_run) |
31+
| `START_RUN` | [Starts a pipeline run](#START_RUN) | [pyspark.pipelines.spark_connect_pipeline](spark_connect_pipeline.md#start_run) |
3232
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_sql) |
3333

3434
`handlePipelinesCommand` reports an `UnsupportedOperationException` for incorrect commands:
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# State
2+
3+
`State` is...FIXME

docs/declarative-pipelines/StreamingFlow.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
`StreamingFlow` is a [ResolvedFlow](ResolvedFlow.md) that may or may not be [append](#mustBeAppend).
44

5+
`StreamingFlow` represents an [UnresolvedFlow](UnresolvedFlow.md) with a [streaming dataframe](FlowFunctionResult.md#dataFrame) in a dataflow graph.
6+
57
`StreamingFlow` is [planned for execution](FlowPlanner.md#plan) as [StreamingTableWrite](StreamingTableWrite.md) (assuming that the [Output](DataflowGraph.md#output) of [this flow](#flow)'s [destination](ResolutionCompletedFlow.md#destinationIdentifier) is a [Table](Table.md)).
68

79
## Creating Instance
@@ -14,7 +16,7 @@
1416

1517
`StreamingFlow` is created when:
1618

17-
* `FlowResolver` is requested to [convertResolvedToTypedFlow](FlowResolver.md#convertResolvedToTypedFlow) (for [UnresolvedFlow](UnresolvedFlow.md)s with their results being streaming dataframes)
19+
* `FlowResolver` is requested to [convertResolvedToTypedFlow](FlowResolver.md#convertResolvedToTypedFlow) (for an [UnresolvedFlow](UnresolvedFlow.md) with a [streaming dataframe](FlowFunctionResult.md#dataFrame))
1820

1921
### mustBeAppend Flag { #mustBeAppend }
2022

docs/declarative-pipelines/StreamingFlowExecution.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,17 @@
22

33
`StreamingFlowExecution` is an [extension](#contract) of the [FlowExecution](FlowExecution.md) abstraction for [streaming flow executions](#implementations) that process data statefully using [Spark Structured Streaming]({{ book.structured_streaming }}).
44

5-
## Contract
5+
## Contract (Subset)
6+
7+
### Checkpoint Location { #checkpointPath }
8+
9+
```scala
10+
checkpointPath: String
11+
```
12+
13+
Used when:
14+
15+
* `StreamingTableWrite` is requested to [start a streaming query](StreamingTableWrite.md#startStream)
616

717
### Execute Streaming Query { #startStream }
818

docs/declarative-pipelines/StreamingTableWrite.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ When [executed](#startStream), `StreamingTableWrite` starts a streaming query to
1212
* <span id="flow"> [ResolvedFlow](StreamingFlowExecution.md#flow)
1313
* <span id="graph"> [DataflowGraph](DataflowGraph.md)
1414
* <span id="updateContext"> [PipelineUpdateContext](FlowExecution.md#updateContext)
15-
* <span id="checkpointPath"> [Checkpoint Path](StreamingFlowExecution.md#checkpointPath)
15+
* <span id="checkpointPath"> [Checkpoint Location](StreamingFlowExecution.md#checkpointPath)
1616
* <span id="trigger"> [Streaming Trigger](StreamingFlowExecution.md#trigger)
1717
* <span id="destination"> [Output table](FlowExecution.md#destination)
1818
* <span id="sqlConf"> [SQL Configuration](StreamingFlowExecution.md#sqlConf)

docs/declarative-pipelines/index.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,16 @@ The following fields are supported:
5353
Field Name | Description
5454
-|-
5555
`name` (required) | &nbsp;
56+
`storage` (required) | The root storage location of pipeline metadata (e.g., checkpoints for streaming flows).<br>[SPARK-53751 Explicit Checkpoint Location]({{ spark.jira }}/SPARK-53751)
5657
`catalog` | The default catalog to register datasets into.<br>Unless specified, [PipelinesHandler](PipelinesHandler.md#createDataflowGraph) falls back to the current catalog.
5758
`database` | The default database to register datasets into<br>Unless specified, [PipelinesHandler](PipelinesHandler.md#createDataflowGraph) falls back to the current database.
5859
`schema` | Alias of `database`. Used unless `database` is defined
59-
`storage` | ⚠️ does not seem to be used
6060
`configuration` | SparkSession configs<br>Spark Pipelines runtime uses the configs to build a new `SparkSession` when `run`.<br>[spark.sql.connect.serverStacktrace.enabled]({{ book.spark_connect }}/configuration-properties/#spark.sql.connect.serverStacktrace.enabled) is hardcoded to be always `false`.
6161
`libraries` | `glob`s of `include`s with transformations in [SQL](#sql) and [Python](#python-decorators)
6262

63+
??? info
64+
Pipeline spec is resolved in `pyspark/pipelines/cli.py::unpack_pipeline_spec`.
65+
6366
```yaml
6467
name: hello-spark-pipelines
6568
catalog: default_catalog

0 commit comments

Comments
 (0)