Skip to content

Commit 62e3c68

Browse files
authored
Merge pull request #14 from klauspost/add-io-pipe
Add io.Pipe compatible interface
2 parents 2fc0b61 + 79542fd commit 62e3c68

File tree

4 files changed

+580
-1
lines changed

4 files changed

+580
-1
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,42 @@ func readWebsite(url string) io.ReadCloser {
104104
return ring.ReadCloser()
105105
}
106106
```
107+
108+
# io.Pipe replacement
109+
110+
The ring buffer can be used as a compatible, but *asynchronous* replacement of `io.Pipe`.
111+
112+
That means that Reads and Writes will go to the ring buffer.
113+
Writes will complete as long as the data fits within the ring buffer.
114+
115+
Reads will attempt to satisfy reads with data from the ring buffer.
116+
The read will only block if the ring buffer is empty.
117+
118+
In the common case, where the Read and Write side can run concurrently,
119+
it is safe to replace `io.Pipe()` with `(*Ringbuffer).Pipe()`.
120+
121+
Compare the following to the [io.Pipe example](https://pkg.go.dev/io#example-Pipe):
122+
123+
```go
124+
func main() {
125+
// Create pipe from a 4KB ring buffer.
126+
r, w := ringbuffer.New(4 << 10).Pipe()
127+
128+
go func() {
129+
fmt.Fprint(w, "some io.Reader stream to be read\n")
130+
w.Close()
131+
}()
132+
133+
if _, err := io.Copy(os.Stdout, r); err != nil {
134+
log.Fatal(err)
135+
}
136+
}
137+
```
138+
139+
When creating the pipe, the ring buffer is internally switched to blocking mode.
140+
141+
Error reporting on Close and CloseWithError functions is similar to `io.Pipe`.
142+
143+
It is possible to use the original ring buffer alongside the pipe functions.
144+
So for example it is possible to "seed" the ring buffer with data,
145+
so reads can complete at once.

example_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package ringbuffer
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
"io"
6+
"log"
7+
"os"
8+
)
49

510
func ExampleRingBuffer() {
611
rb := New(1024)
@@ -15,3 +20,20 @@ func ExampleRingBuffer() {
1520
// 1020
1621
// abcd
1722
}
23+
24+
func ExampleRingBuffer_Pipe() {
25+
// Create pipe from a 4KB ring buffer.
26+
r, w := New(4 << 10).Pipe()
27+
28+
go func() {
29+
fmt.Fprint(w, "some io.Reader stream to be read\n")
30+
w.Close()
31+
}()
32+
33+
if _, err := io.Copy(os.Stdout, r); err != nil {
34+
log.Fatal(err)
35+
}
36+
37+
// Output:
38+
// some io.Reader stream to be read
39+
}

pipe.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2019 smallnest. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package ringbuffer
6+
7+
import "io"
8+
9+
// Pipe creates an asynchronous in-memory pipe compatible with io.Pipe
10+
// It can be used to connect code expecting an [io.Reader]
11+
// with code expecting an [io.Writer].
12+
//
13+
// Reads and Writes will go to the ring buffer.
14+
// Writes will complete as long as the data fits within the ring buffer.
15+
// Reads will attempt to satisfy reads with data from the ring buffer.
16+
// Only if the ring buffer is empty will the read block.
17+
//
18+
// It is safe (and intended) to call Read and Write in parallel with each other or with Close.
19+
func (r *RingBuffer) Pipe() (*PipeReader, *PipeWriter) {
20+
r.SetBlocking(true)
21+
pr := PipeReader{pipe: r}
22+
return &pr, &PipeWriter{pipe: r}
23+
}
24+
25+
// A PipeReader is the read half of a pipe.
26+
type PipeReader struct{ pipe *RingBuffer }
27+
28+
// Read implements the standard Read interface:
29+
// it reads data from the pipe, blocking until a writer
30+
// arrives or the write end is closed.
31+
// If the write end is closed with an error, that error is
32+
// returned as err; otherwise err is io.EOF.
33+
func (r *PipeReader) Read(data []byte) (n int, err error) {
34+
return r.pipe.Read(data)
35+
}
36+
37+
// Close closes the reader; subsequent writes to the
38+
// write half of the pipe will return the error [io.ErrClosedPipe].
39+
func (r *PipeReader) Close() error {
40+
r.pipe.setErr(io.ErrClosedPipe, false)
41+
return nil
42+
}
43+
44+
// CloseWithError closes the reader; subsequent writes
45+
// to the write half of the pipe will return the error err.
46+
//
47+
// CloseWithError never overwrites the previous error if it exists
48+
// and always returns nil.
49+
func (r *PipeReader) CloseWithError(err error) error {
50+
if err == nil {
51+
return r.Close()
52+
}
53+
r.pipe.setErr(err, false)
54+
return nil
55+
}
56+
57+
// A PipeWriter is the write half of a pipe.
58+
type PipeWriter struct{ pipe *RingBuffer }
59+
60+
// Write implements the standard Write interface:
61+
// it writes data to the pipe.
62+
// The Write will block until all data has been written to the ring buffer.
63+
// If the read end is closed with an error, that err is
64+
// returned as err; otherwise err is [io.ErrClosedPipe].
65+
func (w *PipeWriter) Write(data []byte) (n int, err error) {
66+
if n, err = w.pipe.Write(data); err == ErrWriteOnClosed {
67+
// Replace error.
68+
err = io.ErrClosedPipe
69+
}
70+
return n, err
71+
}
72+
73+
// Close closes the writer; subsequent reads from the
74+
// read half of the pipe will return no bytes and EOF.
75+
func (w *PipeWriter) Close() error {
76+
w.pipe.setErr(io.EOF, false)
77+
return nil
78+
}
79+
80+
// CloseWithError closes the writer; subsequent reads from the
81+
// read half of the pipe will return no bytes and the error err,
82+
// or EOF if err is nil.
83+
//
84+
// CloseWithError never overwrites the previous error if it exists
85+
// and always returns nil.
86+
func (w *PipeWriter) CloseWithError(err error) error {
87+
if err == nil {
88+
return w.Close()
89+
}
90+
w.pipe.setErr(err, false)
91+
return nil
92+
}

0 commit comments

Comments
 (0)