Skip to content

Commit e07af21

Browse files
committed
HADOOP-13327 adding open/closed state to output stream, java.io.OutputStream defining actions when output streams are closed.
Change-Id: Ia921e8d2f9af53e8e6655a806e79fcc1dba49d1d
1 parent b7ab6c6 commit e07af21

File tree

2 files changed

+129
-40
lines changed

2 files changed

+129
-40
lines changed

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -580,10 +580,12 @@ and MAY be a `RuntimeException` or subclass. For instance, HDFS may raise a `Inv
580580

581581
FS' where :
582582
FS'.Files'[p] == []
583-
ancestors(p) is-subset-of FS'.Directories'
583+
and ancestors(p) is-subset-of FS'.Directories'
584584

585585
result = FSDataOutputStream
586586

587+
A zero byte file must exist at the end of the specified path, visible to all
588+
587589
The updated (valid) FileSystem must contains all the parent directories of the path, as created by `mkdirs(parent(p))`.
588590

589591
The result is `FSDataOutputStream`, which through its operations may generate new filesystem states with updated values of
@@ -599,8 +601,12 @@ The result is `FSDataOutputStream`, which through its operations may generate ne
599601

600602
* S3N, S3A, Swift and potentially other Object Stores do not currently change the FS state
601603
until the output stream `close()` operation is completed.
602-
This MAY be a bug, as it allows >1 client to create a file with `overwrite==false`,
603-
and potentially confuse file/directory logic
604+
This is a significant difference between the behavior of object stores
605+
and that of filesystems, as it allows >1 client to create a file with `overwrite==false`,
606+
and potentially confuse file/directory logic. In particular, using create() to acquire
607+
an exclusive lock on a file (whoever creates the file without an error is considered
608+
the holder of the lock) is not a valid algorithm when working with object stores.
609+
604610

605611
* The Local FileSystem raises a `FileNotFoundException` when trying to create a file over
606612
a directory, hence it is listed as an exception that MAY be raised when
@@ -620,7 +626,7 @@ Implementations MAY throw `UnsupportedOperationException`.
620626

621627
#### Postconditions
622628

623-
FS
629+
FS' = FS
624630
result = FSDataOutputStream
625631

626632
Return: `FSDataOutputStream`, which can update the entry `FS.Files[p]`

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/outputstream.md

Lines changed: 119 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,35 @@ In a Hadoop filesystem, that destination is the data under a path in the filesys
3535
An output stream consists of a buffer `buf: List[byte]`
3636

3737

38+
## Model
39+
40+
For this specification, the output stream can be viewed as a cached byte array
41+
alongside the filesystem. After data is flushed, read operations on the filesystem
42+
should be in sync with the data
43+
44+
Stream = (path, isOpen, cache)
45+
46+
when opening a new file, the initial state of the cache is empty
47+
48+
Stream = (path, true, [])
49+
50+
When appending to a file, that is the output stream is created using
51+
`FileSystem.append(path, buffersize, progress)`, then the initial state
52+
of the cache is the current contents of the file
53+
54+
55+
Stream = (path, true, data(FS, path))
56+
57+
After a call to `close()`, the stream is closed for all operations other
58+
than `close()`; they MAY fail with `IOException` or `RuntimeException` instances.
59+
60+
Stream = (path, false, [])
61+
62+
The `close()` operation must become a no-op. That is: followon attempts to
63+
persist data after a failure MUST NOT be made. (alternatively: users of the
64+
API MUST NOT expect failing `close()` attemps to succeed if retried)
65+
66+
3867

3968
Data written to an `FSDataOutputStream` implementation
4069

@@ -52,18 +81,23 @@ public class FSDataOutputStream extends DataOutputStream, FilterOutputStream
5281
1. `OutputStream.flush()` flushes data to the destination. There
5382
are no strict persistence requirements.
5483
1. `Syncable.hflush()` synchronously sends all local data to the destination
55-
filesystem. After returning to the caller, the data MUST be visible to other readers.
84+
filesystem. After returning to the caller, the data MUST be visible to other readers,
85+
it MAY be durable. That is: it does not have to be persisted, merely guaranteed
86+
to be consistently visible to all clients attempting to open a new stream reading
87+
data at the path.
5688
1. `Syncable.hsync()` MUST flush the data and persist data to the underlying durable
5789
storage.
58-
1. `close()` MUST flush out all remaining data in the buffers, and persist it.
90+
1. `close()` The first call to `close()` MUST flush out all remaining data in
91+
the buffers, and persist it.
5992

6093

6194
## Concurrency
6295

6396
1. The outcome of more than one process writing to the same file is undefined.
6497

6598
1. An input stream opened to read a file *before the file was opened for writing*
66-
MAY fetch updated data. Because of buffering and caching, this is not a requirement
99+
MAY fetch data updated by writes to an OutputStream.
100+
Because of buffering and caching, this is not a requirement
67101
—and if a input stream does pick up updated data, the point at
68102
which the updated data is read is undefined. This surfaces in object stores
69103
where a `seek()` call which closes and re-opens the connection may pick up
@@ -88,13 +122,16 @@ model permits the output stream to `close()`'d while awaiting an acknowledgement
88122
from datanode or namenode writes in an `hsync()` operation.
89123

90124

91-
## Consistency
125+
## Consistency and Visibility
92126

93127
There is no requirement for the data to be immediately visible to other applications
94128
—not until a specific call to flush buffers or persist it to the underlying storage
95129
medium are made.
96130

97-
131+
1. If an output stream is created with `FileSystem.create(path)`, with overwrite=true
132+
and there is an existing file at the path, that is `exists(FS, path)` holds,
133+
then, the existing data is immediately unavailable; the data at the end of the
134+
path MUST consist of an empty byte sequence `[]`, with consistent metadata.
98135
1. The metadata of a file (`length(FS, path)` in particular) SHOULD be consistent
99136
with the contents of the file after `flush()` and `sync()`.
100137
HDFS does not do this except when the write crosses a block boundary; to do
@@ -105,8 +142,13 @@ operation.
105142
1. If the filesystem supports modification timestamps, this
106143
timestamp MAY be updated while a file is being written, especially after a
107144
`Syncable.hsync()` call. The timestamps MUST be updated after the file is closed.
108-
1. After the contents of an input stream have been persisted (`hflush()/hsync()`)
145+
1. After the contents of an output stream have been persisted (`hflush()/hsync()`)
109146
all new `open(FS, Path)` operations MUST return the updated data.
147+
1. After `close()` has been invoked on an output stream returned from a call
148+
to `FileSystem.create(Path,...)`, a call to `getFileStatus(path)` MUST return the
149+
final metadata of the written file, including length and modification time.
150+
The metadata of the file returned in any of the FileSystem `list` operations
151+
MUST be consistent with this metadata.
110152

111153
### HDFS
112154

@@ -134,9 +176,9 @@ recorded in the metadata.
134176
1. If `read() != -1`, there is new data.
135177

136178
This algorithm works for filesystems which are consistent with metadata and
137-
data, as well as HDFS. What is important to know is that even if the length
138-
of the file is declared to be 0, in HDFS this can mean
139-
"there is data —it just that the metadata is not up to date"
179+
data, as well as HDFS. What is important to know is that even if the
180+
`getFileStatus(path).getLen()==0` holds,
181+
in HDFS there may be data, but the metadata is not up to date"
140182

141183
### Local Filesystem, `file:`
142184

@@ -153,6 +195,7 @@ That is, `sync()` and `hsync()` cannot be guaranteed to persist the data current
153195
buffered locally.
154196

155197
For anyone thinking "this is a violation of this specification" —they are correct.
198+
The local filesystem is intended for testing, rather than production use.
156199

157200
### Object Stores
158201

@@ -163,32 +206,40 @@ blocks of data, synchronously or asynchronously.
163206

164207
Accordingly, they tend not to implement the `Syncable` interface.
165208
However, Exception: Azure's `PageBlobOutputStream` does implement `hsync()`,
166-
blocking until ongoing writes complete.
209+
blocking until write operations being executed in separate threads have completed.
167210

211+
Equally importantly
168212

169-
## Model
213+
1. The object may not be visible at the end of the path until the final `close()`.
214+
is called; this holds for `getFileStatus()`, `open()` and all FileSystem list operations.
215+
1. Any existing data at the end of a path `p`, may remain visible until the final
216+
`close()` operation overwrites this data.
217+
1. The check for existing data in a `create()` call with overwrite=false, may
218+
take place in the `create()` call itself, in the `close()` call prior to/during
219+
the write, or at some point in between. Expect in the special case that the
220+
object store supports an atomic PUT operation, the check for existence of
221+
existing data and the subsequent creation of data at the path contains a race
222+
condition: other clients may create data at the path between the existence check
223+
and the subsequent qrite.
170224

171-
For this specification, the output stream can be viewed as a cached byte array
172-
alongside the filesystem. After data is flushed, read operations on the filesystem
173-
should be in sync with the data
174225

175-
(path, cache)
176226

177-
when opening a new file, the initial state of the cache is empty
178227

179-
(path, [])
228+
### <a name="write(data)"></a>`write(int data)`
180229

181-
When appending to a file, that is the output stream is created using
182-
`FileSystem.append(path, buffersize, progress)`, then the initial state
183-
of the cache is the current contents of the file
230+
#### Preconditions
184231

232+
Stream.isOpen else raise IOException
185233

186-
(path, data(FS, path))
234+
#### Postconditions
187235

236+
The cache has the lower 8 bits of the data argument appended to it.
188237

189-
### <a name="write(byte)"></a>`write(byte)`
238+
Stream'.cache = Stream.cache + [data & 0xff]
190239

191-
cache' = cache + [byte]
240+
There may be an explicit limit on the size of cached data, or an implicit
241+
limit based by the available capacity of the destination filesystem.
242+
When a limit is reached, `write()` MUST fail with an `IOException`.
192243

193244
### <a name="write(buffer,offset,len)"></a>`write(byte[] buffer, int offset, int len)`
194245

@@ -197,16 +248,24 @@ of the cache is the current contents of the file
197248

198249
The preconditions are all defined in `OutputStream.write()`
199250

251+
Stream.isOpen else raise IOException
200252
buffer != null else raise NullPointerException
201253
offset >= 0 else raise IndexOutOfBoundsException
202254
len >= 0 else raise IndexOutOfBoundsException
203255
offset < buffer.length else raise IndexOutOfBoundsException
204256
offset + len < buffer.length else raise IndexOutOfBoundsException
205257

206258

259+
There may be an explicit limit on the size of cached data, or an implicit
260+
limit based by the available capacity of the destination filesystem.
261+
When a limit is reached, `write()` MUST fail with an `IOException`.
262+
263+
After the operation has returned, the buffer may be re-used. The outcome
264+
of updates to the buffer while the `write()` operation is in progress is undefined.
265+
207266
#### Postconditions
208267

209-
cache' = cache + buffer[offset...offset+len]
268+
Stream'.cache = Stream.cache + buffer[offset...offset+len]
210269

211270

212271
### <a name="write(buffer)"></a>`write(byte[] buffer)`
@@ -218,21 +277,24 @@ This is required to be the equivalent of
218277

219278
#### Preconditions
220279

221-
With the offset of 0 and the length known to be that of the buffer, the
222-
preconditions can be simplified to
280+
Stream.isOpen else raise IOException
281+
223282

283+
With the offset of 0 and the length known to be that of the buffer, the
284+
other preconditions of `write(byte[] buffer, int offset, int len)`
285+
can be simplified to
224286

225287
buffer != null else raise NullPointerException
226288

227289
#### Postconditions
228290

229291
The postconditions become
230292

231-
cache' = cache + buffer[0...buffer.length]
293+
Stream'.cache = Stream.cache + buffer[0...buffer.length]
232294

233295
Which is equivalent to
234296

235-
cache' = cache + buffer
297+
Stream'.cache = Stream.cache + buffer
236298

237299
### <a name="flush()"></a>`flush()`
238300

@@ -244,9 +306,14 @@ It explicitly precludes any guarantees about durability.
244306

245307
#### Preconditions
246308

309+
Stream.isOpen else raise IOException
310+
247311
#### Postconditions
248312

249313

314+
FS' = FS where data(path) == cache
315+
316+
250317
### <a name="close"></a>`close()`
251318

252319
The `close()` operation completes the write. It is expected to block
@@ -260,15 +327,17 @@ updated).
260327

261328
Any locking/leaseholding mechanism is also required to release its lock/lease.
262329

263-
The are two non-requirements of the `close()` operation; code use
264330

265-
The `close()` call MAY fail during its operation. This is clearly an erroneous
266-
outcome, but it is possible.
331+
FS' = FS where data(path) == cache
332+
Stream'.isOpen = false
333+
267334

268-
1. Callers of the API MUST expect this and SHOULD code appropriately. Catching
269-
and swallowing exceptions, while common, is not always the ideal solution.
270-
1. Even after a failure, `close()` should place the stream into a closed state,
271-
where future calls to `close()` are ignored, and calls to other methods
335+
The `close()` call MAY fail during its operation.
336+
337+
1. Callers of the API MUST expect for some calls to fail and SHOULD code appropriately.
338+
Catching and swallowing exceptions, while common, is not always the ideal solution.
339+
1. Even after a failure, `close()` MUST place the stream into a closed state.
340+
Follow-on calls to `close()` are ignored, and calls to other methods
272341
rejected. That is: caller's cannot be expected to call `close()` repeatedly
273342
until it succeeds.
274343
1. The duration of the `call()` operation is undefined. Operations which rely
@@ -304,6 +373,13 @@ which implement `Syncable`.
304373
Flush out the data in client's user buffer. After the return of
305374
this call, new readers will see the data.
306375

376+
#### Preconditions
377+
378+
Stream.isOpen else raise IOException
379+
380+
#### Postconditions
381+
382+
307383
FS' = FS where data(path) == cache
308384

309385
It's not clear whether this operation is expected to be blocking, that is,
@@ -319,6 +395,13 @@ all the way to the disk device (but the disk may have it in its cache).
319395
That is, it is a requirement for the underlying FS To save all the data to
320396
the disk hardware itself, where it is expected to be durable.
321397

398+
#### Preconditions
399+
400+
Stream.isOpen else raise IOException
401+
402+
#### Postconditions
403+
404+
322405
FS' = FS where data(path) == cache
323406

324407
The reference implementation, `DFSOutputStream` will block
@@ -348,5 +431,5 @@ environments.
348431

349432
### <a name="Syncable.hflush"></a>`Syncable.hflush()`
350433

351-
Deprecated: replaced by `hflush()`
434+
Deprecated: replaced by `hsync()`
352435

0 commit comments

Comments
 (0)