Skip to content

Commit 0449a03

Browse files
committed
Added data.FullReader and moved to low level library
Added ability to get readers from Base
1 parent 0574bbe commit 0449a03

File tree

4 files changed

+172
-16
lines changed

4 files changed

+172
-16
lines changed

internal/data/fullreader.go

Lines changed: 0 additions & 5 deletions
This file was deleted.

squashfs/base.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import (
44
"errors"
55
"io"
66

7-
"github.com/CalebQ42/squashfs/internal/data"
7+
"github.com/CalebQ42/squashfs/internal/decompress"
88
"github.com/CalebQ42/squashfs/internal/metadata"
99
"github.com/CalebQ42/squashfs/internal/toreader"
10+
"github.com/CalebQ42/squashfs/squashfs/data"
1011
"github.com/CalebQ42/squashfs/squashfs/directory"
1112
"github.com/CalebQ42/squashfs/squashfs/inode"
1213
)
@@ -50,9 +51,9 @@ func (b *Base) IsRegular() bool {
5051
return b.Inode.Type == inode.Fil || b.Inode.Type == inode.EFil
5152
}
5253

53-
func (b *Base) GetRegFileReaders(r *Reader) (io.ReadCloser, error) {
54+
func (b *Base) GetRegFileReaders(r *Reader) (*data.Reader, *data.FullReader, error) {
5455
if !b.IsRegular() {
55-
return nil, errors.New("not a regular file")
56+
return nil, nil, errors.New("not a regular file")
5657
}
5758
var blockStart uint64
5859
var fragIndex uint32
@@ -69,19 +70,26 @@ func (b *Base) GetRegFileReaders(r *Reader) (io.ReadCloser, error) {
6970
fragOffset = b.Inode.Data.(inode.EFile).FragOffset
7071
sizes = b.Inode.Data.(inode.EFile).BlockSizes
7172
}
72-
var frag *data.Reader
73-
if fragIndex != 0xFFFFFFFF {
73+
frag := func(rdr io.ReaderAt, d decompress.Decompressor) (*data.Reader, error) {
7474
ent, err := r.fragEntry(fragIndex)
7575
if err != nil {
7676
return nil, err
7777
}
78-
frag = data.NewReader(toreader.NewReader(r.r, int64(ent.start)), r.d, []uint32{ent.size})
78+
frag := data.NewReader(toreader.NewReader(r.r, int64(ent.start)), r.d, []uint32{ent.size})
7979
frag.Read(make([]byte, fragOffset))
80+
return frag, nil
8081
}
81-
out := data.NewReader(toreader.NewReader(r.r, int64(blockStart)), r.d, sizes)
82-
if frag != nil {
83-
out.AddFrag(out)
82+
outRdr := data.NewReader(toreader.NewReader(r.r, int64(blockStart)), r.d, sizes)
83+
if fragIndex != 0xffffffff {
84+
f, err := frag(r.r, r.d)
85+
if err != nil {
86+
return nil, nil, err
87+
}
88+
outRdr.AddFrag(f)
89+
}
90+
outFull := data.NewFullReader(r.r, int64(blockStart), r.d, sizes)
91+
if fragIndex != 0xffffffff {
92+
outFull.AddFrag(frag)
8493
}
85-
//TODO: implement and add full reader
86-
return out, nil
94+
return outRdr, outFull, nil
8795
}

squashfs/data/fullreader.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package data
2+
3+
import (
4+
"encoding/binary"
5+
"errors"
6+
"io"
7+
"sync"
8+
9+
"github.com/CalebQ42/squashfs/internal/decompress"
10+
"github.com/CalebQ42/squashfs/internal/toreader"
11+
)
12+
13+
type FragReaderConstructor func(io.ReaderAt, decompress.Decompressor) (*Reader, error)
14+
15+
type FullReader struct {
16+
r io.ReaderAt
17+
d decompress.Decompressor
18+
frag FragReaderConstructor
19+
retPool *sync.Pool
20+
sizes []uint32
21+
initialOffset int64
22+
goroutineLimit uint16
23+
}
24+
25+
func NewFullReader(r io.ReaderAt, initialOffset int64, d decompress.Decompressor, sizes []uint32) *FullReader {
26+
return &FullReader{
27+
r: r,
28+
d: d,
29+
sizes: sizes,
30+
initialOffset: initialOffset,
31+
goroutineLimit: 10,
32+
retPool: &sync.Pool{
33+
New: func() any {
34+
return &retValue{}
35+
},
36+
},
37+
}
38+
}
39+
40+
func (r *FullReader) AddFrag(frag FragReaderConstructor) {
41+
r.frag = frag
42+
}
43+
44+
func (r *FullReader) SetGoroutineLimit(limit uint16) {
45+
r.goroutineLimit = limit
46+
}
47+
48+
type retValue struct {
49+
err error
50+
data []byte
51+
index uint64
52+
}
53+
54+
func (r *FullReader) process(index uint64, fileOffset uint64, retChan chan *retValue) {
55+
ret := r.retPool.Get().(*retValue)
56+
ret.index = index
57+
realSize := r.sizes[index] &^ (1 << 24)
58+
ret.data = make([]byte, realSize)
59+
ret.err = binary.Read(toreader.NewReader(r.r, int64(r.initialOffset)+int64(fileOffset)), binary.LittleEndian, &ret.data)
60+
if r.sizes[index] == realSize {
61+
ret.data, ret.err = r.d.Decompress(ret.data)
62+
}
63+
retChan <- ret
64+
}
65+
66+
func (r *FullReader) WriteTo(w io.Writer) (int64, error) {
67+
var curIndex uint64
68+
var curOffset uint64
69+
var toProcess uint16
70+
var wrote int64
71+
cache := make(map[uint64]*retValue)
72+
var errCache []error
73+
retChan := make(chan *retValue, r.goroutineLimit)
74+
for i := uint64(0); i < uint64(len(r.sizes))/uint64(r.goroutineLimit); i++ {
75+
toProcess = uint16(len(r.sizes)) - (uint16(i) * r.goroutineLimit)
76+
if toProcess > r.goroutineLimit {
77+
toProcess = r.goroutineLimit
78+
}
79+
// Start all the goroutines
80+
for j := uint16(0); j < toProcess; j++ {
81+
go r.process((i*uint64(r.goroutineLimit))+uint64(j), curOffset, retChan)
82+
curOffset += uint64(r.sizes[(i*uint64(r.goroutineLimit))+uint64(j)]) &^ (1 << 24)
83+
}
84+
// Then consume the results on retChan
85+
for j := uint16(0); j < toProcess; j++ {
86+
res := <-retChan
87+
// If there's an error, we don't care about the results.
88+
if res.err != nil {
89+
errCache = append(errCache, res.err)
90+
if len(cache) > 0 {
91+
clear(cache)
92+
}
93+
continue
94+
}
95+
// If there has been an error previously, we don't care about the results.
96+
// We still want to wait for all the goroutines to prevent resources being wasted.
97+
if len(errCache) > 0 {
98+
continue
99+
}
100+
// If we don't need the data yet, we cache it and move on
101+
if res.index != curIndex {
102+
cache[res.index] = res
103+
continue
104+
}
105+
// If we do need the data, we write it
106+
wr, err := w.Write(res.data)
107+
wrote += int64(wr)
108+
if err != nil {
109+
errCache = append(errCache, err)
110+
if len(cache) > 0 {
111+
clear(cache)
112+
}
113+
continue
114+
}
115+
r.retPool.Put(res)
116+
curIndex++
117+
// Now we recursively try to clear the cache
118+
for len(cache) > 0 {
119+
res, ok := cache[curIndex]
120+
if !ok {
121+
break
122+
}
123+
wr, err := w.Write(res.data)
124+
wrote += int64(wr)
125+
if err != nil {
126+
errCache = append(errCache, err)
127+
if len(cache) > 0 {
128+
clear(cache)
129+
}
130+
break
131+
}
132+
delete(cache, curIndex)
133+
r.retPool.Put(res)
134+
curIndex++
135+
}
136+
}
137+
if len(errCache) > 0 {
138+
return wrote, errors.Join(errCache...)
139+
}
140+
}
141+
if r.frag != nil {
142+
rdr, err := r.frag(r.r, r.d)
143+
if err != nil {
144+
return wrote, err
145+
}
146+
wr, err := io.Copy(w, rdr)
147+
wrote += wr
148+
if err != nil {
149+
return wrote, err
150+
}
151+
}
152+
return wrote, nil
153+
}
File renamed without changes.

0 commit comments

Comments
 (0)