Skip to content

Commit 79542fd

Browse files
committed
Add io.Pipe compatible interface
The ring buffer can be used as a compatible, but *asynchronous* replacement of `io.Pipe`. That means that Reads and Writes will go to the ring buffer. Writes will complete as long as the data fits within the ring buffer. Reads will attempt to satisfy reads with data from the ring buffer. Only if the ring buffer is empty will the read block. In the common case, where the Read and Write side can run concurrently, it is safe to replace `io.Pipe()` with `(*Ringbuffer).Pipe()`.
1 parent 2fc0b61 commit 79542fd

File tree

4 files changed

+580
-1
lines changed

4 files changed

+580
-1
lines changed

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,42 @@ func readWebsite(url string) io.ReadCloser {
104104
return ring.ReadCloser()
105105
}
106106
```
107+
108+
# io.Pipe replacement
109+
110+
The ring buffer can be used as a compatible, but *asynchronous* replacement of `io.Pipe`.
111+
112+
That means that Reads and Writes will go to the ring buffer.
113+
Writes will complete as long as the data fits within the ring buffer.
114+
115+
Reads will attempt to satisfy reads with data from the ring buffer.
116+
The read will only block if the ring buffer is empty.
117+
118+
In the common case, where the Read and Write side can run concurrently,
119+
it is safe to replace `io.Pipe()` with `(*Ringbuffer).Pipe()`.
120+
121+
Compare the following to the [io.Pipe example](https://pkg.go.dev/io#example-Pipe):
122+
123+
```go
124+
func main() {
125+
// Create pipe from a 4KB ring buffer.
126+
r, w := ringbuffer.New(4 << 10).Pipe()
127+
128+
go func() {
129+
fmt.Fprint(w, "some io.Reader stream to be read\n")
130+
w.Close()
131+
}()
132+
133+
if _, err := io.Copy(os.Stdout, r); err != nil {
134+
log.Fatal(err)
135+
}
136+
}
137+
```
138+
139+
When creating the pipe, the ring buffer is internally switched to blocking mode.
140+
141+
Error reporting on Close and CloseWithError functions is similar to `io.Pipe`.
142+
143+
It is possible to use the original ring buffer alongside the pipe functions.
144+
So for example it is possible to "seed" the ring buffer with data,
145+
so reads can complete at once.

example_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package ringbuffer
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
"io"
6+
"log"
7+
"os"
8+
)
49

510
func ExampleRingBuffer() {
611
rb := New(1024)
@@ -15,3 +20,20 @@ func ExampleRingBuffer() {
1520
// 1020
1621
// abcd
1722
}
23+
24+
func ExampleRingBuffer_Pipe() {
25+
// Create pipe from a 4KB ring buffer.
26+
r, w := New(4 << 10).Pipe()
27+
28+
go func() {
29+
fmt.Fprint(w, "some io.Reader stream to be read\n")
30+
w.Close()
31+
}()
32+
33+
if _, err := io.Copy(os.Stdout, r); err != nil {
34+
log.Fatal(err)
35+
}
36+
37+
// Output:
38+
// some io.Reader stream to be read
39+
}

pipe.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright 2019 smallnest. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package ringbuffer
6+
7+
import "io"
8+
9+
// Pipe creates an asynchronous in-memory pipe compatible with io.Pipe
10+
// It can be used to connect code expecting an [io.Reader]
11+
// with code expecting an [io.Writer].
12+
//
13+
// Reads and Writes will go to the ring buffer.
14+
// Writes will complete as long as the data fits within the ring buffer.
15+
// Reads will attempt to satisfy reads with data from the ring buffer.
16+
// Only if the ring buffer is empty will the read block.
17+
//
18+
// It is safe (and intended) to call Read and Write in parallel with each other or with Close.
19+
func (r *RingBuffer) Pipe() (*PipeReader, *PipeWriter) {
20+
r.SetBlocking(true)
21+
pr := PipeReader{pipe: r}
22+
return &pr, &PipeWriter{pipe: r}
23+
}
24+
25+
// A PipeReader is the read half of a pipe.
26+
type PipeReader struct{ pipe *RingBuffer }
27+
28+
// Read implements the standard Read interface:
29+
// it reads data from the pipe, blocking until a writer
30+
// arrives or the write end is closed.
31+
// If the write end is closed with an error, that error is
32+
// returned as err; otherwise err is io.EOF.
33+
func (r *PipeReader) Read(data []byte) (n int, err error) {
34+
return r.pipe.Read(data)
35+
}
36+
37+
// Close closes the reader; subsequent writes to the
38+
// write half of the pipe will return the error [io.ErrClosedPipe].
39+
func (r *PipeReader) Close() error {
40+
r.pipe.setErr(io.ErrClosedPipe, false)
41+
return nil
42+
}
43+
44+
// CloseWithError closes the reader; subsequent writes
45+
// to the write half of the pipe will return the error err.
46+
//
47+
// CloseWithError never overwrites the previous error if it exists
48+
// and always returns nil.
49+
func (r *PipeReader) CloseWithError(err error) error {
50+
if err == nil {
51+
return r.Close()
52+
}
53+
r.pipe.setErr(err, false)
54+
return nil
55+
}
56+
57+
// A PipeWriter is the write half of a pipe.
58+
type PipeWriter struct{ pipe *RingBuffer }
59+
60+
// Write implements the standard Write interface:
61+
// it writes data to the pipe.
62+
// The Write will block until all data has been written to the ring buffer.
63+
// If the read end is closed with an error, that err is
64+
// returned as err; otherwise err is [io.ErrClosedPipe].
65+
func (w *PipeWriter) Write(data []byte) (n int, err error) {
66+
if n, err = w.pipe.Write(data); err == ErrWriteOnClosed {
67+
// Replace error.
68+
err = io.ErrClosedPipe
69+
}
70+
return n, err
71+
}
72+
73+
// Close closes the writer; subsequent reads from the
74+
// read half of the pipe will return no bytes and EOF.
75+
func (w *PipeWriter) Close() error {
76+
w.pipe.setErr(io.EOF, false)
77+
return nil
78+
}
79+
80+
// CloseWithError closes the writer; subsequent reads from the
81+
// read half of the pipe will return no bytes and the error err,
82+
// or EOF if err is nil.
83+
//
84+
// CloseWithError never overwrites the previous error if it exists
85+
// and always returns nil.
86+
func (w *PipeWriter) CloseWithError(err error) error {
87+
if err == nil {
88+
return w.Close()
89+
}
90+
w.pipe.setErr(err, false)
91+
return nil
92+
}

0 commit comments

Comments
 (0)