Skip to content

Commit df31a24

Browse files
[SDP] SinkWrite Flow Execution
1 parent e69b32a commit df31a24

File tree

5 files changed

+75
-13
lines changed

5 files changed

+75
-13
lines changed

docs/declarative-pipelines/DataflowGraph.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# DataflowGraph
22

3-
`DataflowGraph` is a [GraphRegistrationContext](GraphRegistrationContext.md) with [tables](#tables), [views](#views) and [flows](#flows) fully-qualified, resolved and de-duplicated.
3+
`DataflowGraph` is a [GraphRegistrationContext](GraphRegistrationContext.md) with [tables](#tables), [sinks](#sinks), [views](#views) and [flows](#flows) fully-qualified, resolved and de-duplicated.
44

55
## Creating Instance
66

docs/declarative-pipelines/FlowPlanner.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@ plan(
3131
flow: ResolvedFlow): FlowExecution
3232
```
3333

34-
`plan` creates a [FlowExecution](FlowExecution.md) (for the given [ResolvedFlow](ResolvedFlow.md)) as follows:
34+
`plan` [looks up the output](DataflowGraph.md#output) for the [destination identifier](ResolutionCompletedFlow.md#destinationIdentifier) of the given [ResolvedFlow](ResolvedFlow.md) in this [DataflowGraph](#graph).
3535

36-
FlowExecution | ResolvedFlow
37-
-|-
38-
[BatchTableWrite](BatchTableWrite.md) | [CompleteFlow](CompleteFlow.md)
39-
[StreamingTableWrite](StreamingTableWrite.md) | [StreamingFlow](StreamingFlow.md)
36+
`plan` creates a [FlowExecution](FlowExecution.md) (for the given [ResolvedFlow](ResolvedFlow.md) and the [Output](Output.md)) as follows:
37+
38+
FlowExecution | ResolvedFlow | Output |
39+
-|-|-
40+
[BatchTableWrite](BatchTableWrite.md) | [CompleteFlow](CompleteFlow.md) | [Table](Table.md)
41+
[SinkWrite](SinkWrite.md) | [StreamingFlow](StreamingFlow.md) | [Sink](Sink.md)
42+
[StreamingTableWrite](StreamingTableWrite.md) | [StreamingFlow](StreamingFlow.md) | [Table](Table.md)
4043

4144
---
4245

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,53 @@
1-
# SinkWrite
1+
---
2+
title: SinkWrite
3+
---
24

3-
`SinkWrite` is...FIXME
5+
# SinkWrite Flow Execution
6+
7+
`SinkWrite` is a [StreamingFlowExecution](StreamingFlowExecution.md) that writes a streaming `DataFrame` to a [Sink](#destination).
8+
9+
`SinkWrite` represents a [StreamingFlow](StreamingFlow.md) with a [Sink](Sink.md) as the [output destination](ResolutionCompletedFlow.md#destinationIdentifier) at execution.
10+
11+
When [executed](#startStream), `SinkWrite` starts a streaming query to append new rows to an [output table](#destination).
12+
13+
## Creating Instance
14+
15+
`SinkWrite` takes the following to be created:
16+
17+
* <span id="identifier"> [TableIdentifier](FlowExecution.md#identifier)
18+
* <span id="flow"> [ResolvedFlow](StreamingFlowExecution.md#flow)
19+
* <span id="graph"> [DataflowGraph](DataflowGraph.md)
20+
* <span id="updateContext"> [PipelineUpdateContext](FlowExecution.md#updateContext)
21+
* <span id="checkpointPath"> [Checkpoint Location](StreamingFlowExecution.md#checkpointPath)
22+
* <span id="trigger"> [Streaming Trigger](StreamingFlowExecution.md#trigger)
23+
* <span id="destination"> [Destination](FlowExecution.md#destination) ([Sink](Sink.md))
24+
* <span id="sqlConf"> [SQL Configuration](StreamingFlowExecution.md#sqlConf)
25+
26+
`SinkWrite` is created when:
27+
28+
* `FlowPlanner` is requested to [plan a ResolvedFlow](FlowPlanner.md#plan)
29+
30+
## Start Streaming Query { #startStream }
31+
32+
??? note "StreamingFlowExecution"
33+
34+
```scala
35+
startStream(): StreamingQuery
36+
```
37+
38+
`startStream` is part of the [StreamingFlowExecution](StreamingFlowExecution.md#startStream) abstraction.
39+
40+
`startStream` builds the logical query plan of this [flow](#flow)'s structured query (requesting the [DataflowGraph](#graph) to [reanalyze](DataflowGraph.md#reanalyzeFlow) this [flow](#flow)).
41+
42+
`startStream` creates a `DataStreamWriter` ([Spark Structured Streaming]({{ book.structured_streaming }}/DataStreamWriter/)) with the following:
43+
44+
`DataStreamWriter`'s Property | Value
45+
-|-
46+
`queryName` | This [displayName](FlowExecution.md#displayName)
47+
`checkpointLocation` option | This [checkpoint path](#checkpointPath)
48+
`trigger` | This [streaming trigger](#trigger)
49+
`outputMode` | [Append]({{ book.structured_streaming }}/OutputMode/#append) (always)
50+
`format` | The [format](Sink.md#format) of this [output sink](#destination)
51+
`options` | The [options](Sink.md#options) of this [output sink](#destination)
52+
53+
In the end, `startStream` starts the streaming write query to this [output table](#destination).

docs/declarative-pipelines/StreamingFlowExecution.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ Used when:
1414

1515
* `StreamingTableWrite` is requested to [start a streaming query](StreamingTableWrite.md#startStream)
1616

17-
### Execute Streaming Query { #startStream }
17+
### Start Streaming Query { #startStream }
1818

1919
```scala
2020
startStream(): StreamingQuery
2121
```
2222

2323
See:
2424

25+
* [SinkWrite](SinkWrite.md#startStream)
2526
* [StreamingTableWrite](StreamingTableWrite.md#startStream)
2627

2728
Used when:
@@ -34,8 +35,11 @@ Used when:
3435
trigger: Trigger
3536
```
3637

38+
`Trigger` ([Structured Streaming]({{ book.structured_streaming }}/Trigger))
39+
3740
See:
3841

42+
* [SinkWrite](SinkWrite.md#trigger)
3943
* [StreamingTableWrite](StreamingTableWrite.md#trigger)
4044

4145
Used when:
@@ -45,6 +49,7 @@ Used when:
4549

4650
## Implementations
4751

52+
* [SinkWrite](SinkWrite.md)
4853
* [StreamingTableWrite](StreamingTableWrite.md)
4954

5055
## executeInternal { #executeInternal }
@@ -60,7 +65,7 @@ Used when:
6065
`executeInternal` prints out the following INFO message to the logs:
6166

6267
```text
63-
Starting [identifier] with checkpoint location [checkpointPath]"
68+
Starting [identifier] with checkpoint location [checkpointPath]
6469
```
6570

6671
`executeInternal` [starts the stream](#startStream) (with this [SparkSession](FlowExecution.md#spark) and [sqlConf](#sqlConf)).

docs/declarative-pipelines/StreamingTableWrite.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
# StreamingTableWrite
1+
---
2+
title: StreamingTableWrite
3+
---
24

3-
`StreamingTableWrite` is a [StreamingFlowExecution](StreamingFlowExecution.md).
5+
# StreamingTableWrite Flow Execution
6+
7+
`StreamingTableWrite` is a [StreamingFlowExecution](StreamingFlowExecution.md) that writes a streaming `DataFrame` to a [Table](#destination)..
48

59
When [executed](#startStream), `StreamingTableWrite` starts a streaming query to append new rows to an [output table](#destination).
610

@@ -14,7 +18,7 @@ When [executed](#startStream), `StreamingTableWrite` starts a streaming query to
1418
* <span id="updateContext"> [PipelineUpdateContext](FlowExecution.md#updateContext)
1519
* <span id="checkpointPath"> [Checkpoint Location](StreamingFlowExecution.md#checkpointPath)
1620
* <span id="trigger"> [Streaming Trigger](StreamingFlowExecution.md#trigger)
17-
* <span id="destination"> [Output table](FlowExecution.md#destination)
21+
* <span id="destination"> [Destination](FlowExecution.md#destination) ([Table](Table.md))
1822
* <span id="sqlConf"> [SQL Configuration](StreamingFlowExecution.md#sqlConf)
1923

2024
`StreamingTableWrite` is created when:

0 commit comments

Comments
 (0)