Skip to content

Commit c2682ee

Browse files
authored
Fixdownloadbug (#363)
Fixdownloadbug
1 parent be46090 commit c2682ee

File tree

8 files changed

+149
-70
lines changed

8 files changed

+149
-70
lines changed

go/filemanager/pfsmodules/cp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func RunCp(cmd *CpCmd) error {
149149
}
150150

151151
if err != nil {
152-
ColorError("err:%s\n", err)
152+
ColorError("proc %s err:%s\n", arg, err)
153153
return err
154154
}
155155

go/filemanager/pfsmodules/download.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,47 +31,48 @@ func downloadFile(src string, srcFileSize int64, dst string, verbose bool, chunk
3131

3232
for {
3333
start := time.Now()
34-
m, errm := r.GetChunkMeta(offset, chunkSize)
35-
if errm != nil && errm != io.EOF {
36-
return errm
34+
sm, errs := r.GetChunkMeta(offset, chunkSize)
35+
if errs != nil && errs != io.EOF {
36+
return errs
3737
}
38-
log.V(2).Infoln("remote chunk info:", m)
38+
log.V(2).Infoln("remote chunk info:", sm)
3939

40-
c, errc := w.ReadChunk(offset, chunkSize)
41-
if errc != nil && errc != io.EOF {
42-
return errc
40+
wm, errw := w.GetChunkMeta(offset, chunkSize)
41+
if errw != nil && errw != io.EOF {
42+
return errw
4343
}
44-
offset += m.Len
45-
log.V(2).Infoln("local chunk info:" + c.String())
44+
log.V(2).Infoln("local chunk info:", wm)
4645

47-
if m.Checksum == c.Checksum {
46+
if sm.Checksum == wm.Checksum {
4847
if verbose {
4948
used := time.Since(start).Nanoseconds() / time.Millisecond.Nanoseconds()
50-
ColorInfoOverWrite("%s download %d%% %dKB/s", src, offset*100/srcFileSize, m.Len/used)
49+
ColorInfoOverWrite("%s download %d%% %dKB/s", src, offset*100/srcFileSize, sm.Len/used)
5150
}
52-
log.V(2).Infof("remote chunk is same as local chunk:%s\n\n", c.String())
53-
if errc == io.EOF || errm == io.EOF {
51+
offset += sm.Len
52+
log.V(2).Infoln("remote chunk is same as local chunk:", sm)
53+
if errs == io.EOF || errw == io.EOF {
5454
break
5555
}
5656
continue
5757
}
5858

59-
c, err := r.ReadChunk(offset, m.Len)
59+
c, err := r.ReadChunk(offset, sm.Len)
6060
if err != nil && err != io.EOF {
6161
return err
6262
}
6363

6464
if err := w.WriteChunk(c); err != nil {
6565
return err
6666
}
67+
offset += sm.Len
6768

6869
if verbose {
6970
used := time.Since(start).Nanoseconds() / time.Millisecond.Nanoseconds()
70-
ColorInfoOverWrite("%s download %d%% %dKB/s", src, offset*100/srcFileSize, m.Len/used)
71+
ColorInfoOverWrite("%s download %d%% %dKB/s", src, offset*100/srcFileSize, sm.Len/used)
7172
}
7273

7374
log.V(2).Infof("downlod chunk:%s ok\n\n", c.String())
74-
if errc == io.EOF || errm == io.EOF {
75+
if errs == io.EOF || errw == io.EOF {
7576
break
7677
}
7778
}
@@ -128,7 +129,7 @@ func download(src, dst string, verbose bool, chunkSize int64) error {
128129
ColorError("Download %s to %s error info:%s\n", realSrc, realDst, err)
129130
return err
130131
}
131-
ColorInfo("Downloaded %s\n", realSrc)
132+
ColorInfoOverWrite("Downloaded %s\n", realSrc)
132133
}
133134

134135
return nil

go/filemanager/pfsmodules/localfile.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,15 @@ func (f *FileHandle) CopyN(w io.Writer, offset, len int64) error {
125125
n, err := io.CopyN(w, f.F, len)
126126
log.V(2).Infof("CopyN expect %d real %d\n", len, n)
127127

128-
if err != nil && err != io.EOF {
128+
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
129129
return err
130130
}
131131
f.Offset += int64(n)
132132

133+
if err == io.ErrUnexpectedEOF {
134+
err = io.EOF
135+
}
136+
133137
return err
134138
}
135139

go/filemanager/pfsmodules/remotefile.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,26 @@ func getChunkData(m ChunkParam) (*Chunk, error) {
7373
break
7474
}
7575

76-
if part.FormName() == "chunk" {
77-
m1, err := ParseChunkParam(part.FileName())
78-
if err != nil {
79-
return nil, errors.New(err.Error())
80-
}
81-
82-
c = NewChunk(m1.Size)
83-
c.Len = m1.Size
84-
c.Offset = m1.Offset
85-
if _, err := part.Read(c.Data); err != nil && err != io.EOF {
86-
return nil, err
87-
}
76+
if part.FormName() != "chunk" {
77+
continue
78+
}
79+
80+
log.V(2).Infof("received post chunk param:%s\n", part.FileName())
81+
m1, err := ParseChunkParam(part.FileName())
82+
if err != nil {
83+
return nil, errors.New(err.Error())
84+
}
85+
86+
c = NewChunk(m1.Size)
87+
c.Len = m1.Size
88+
c.Offset = m1.Offset
89+
n, err := io.ReadFull(part, c.Data)
90+
if err != nil {
91+
return c, err
92+
}
93+
94+
if int64(n) != m1.Size {
95+
log.V(2).Infof("download chunk data error expected %d real %d", m1.Size, n)
8896
}
8997
}
9098

go/filemanager/pfsmodules/stat.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func (p *StatCmd) ToURLParam() url.Values {
3030
parameters.Add("path", p.Path)
3131

3232
return parameters
33-
3433
}
3534

3635
// ToJSON encodes memebers to json string.

go/filemanager/pfsserver/handler.go

Lines changed: 34 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -329,64 +329,60 @@ func checkChunkAuthorization(path string, header http.Header, cmdName string) (s
329329
return user, resp
330330
}
331331

332-
// GetChunkHandler processes GET Chunk request.
333-
func GetChunkHandler(w http.ResponseWriter, r *http.Request) {
334-
log.V(1).Infof("begin proc GetChunkHandler")
335-
336-
p, err := pfsmod.ParseChunkParam(r.URL.RawQuery)
337-
if err != nil {
338-
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, response{})
339-
return
340-
}
341-
342-
user, resp := checkChunkAuthorization(p.Path, r.Header, "GetChunk")
343-
if resp.Err != "" {
344-
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp)
345-
return
346-
}
347-
332+
func getChunk(w http.ResponseWriter, r *http.Request, param *pfsmod.ChunkParam) error {
348333
writer := multipart.NewWriter(w)
349334
writer.SetBoundary(pfsmod.DefaultMultiPartBoundary)
350335

351-
fileName := p.ToURLParam().Encode()
336+
fileName := param.ToURLParam().Encode()
352337
part, err := writer.CreateFormFile("chunk", fileName)
353338
if err != nil {
354-
log.Error(err)
355-
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, response{})
356-
return
339+
return err
357340
}
358-
359-
resp = response{}
341+
defer writer.Close()
360342

361343
fr := pfsmod.FileHandle{}
362-
if err := fr.Open(p.Path, os.O_RDONLY, 0); err != nil {
363-
resp.Err = err.Error()
364-
log.Error(err)
365-
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp)
366-
return
344+
if err := fr.Open(param.Path, os.O_RDONLY, 0); err != nil {
345+
return err
367346
}
368347
defer fr.Close()
369348

370-
log.Infof("user:%s download %s\n", user, p.String())
371-
372-
if err = fr.CopyN(part, p.Offset, p.Size); err != nil {
349+
err = fr.CopyN(part, param.Offset, param.Size)
350+
if err != nil {
373351
if err != io.EOF {
374-
resp.Err = err.Error()
375-
log.Error(err)
376-
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp)
377-
return
352+
return err
378353
}
379354

380-
resp.Err = pfsmod.StatusFileEOF
381-
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp)
355+
return errors.New(pfsmod.StatusFileEOF)
382356
}
357+
return nil
358+
}
383359

384-
err = writer.Close()
360+
// GetChunkHandler processes GET Chunk request.
361+
func GetChunkHandler(w http.ResponseWriter, r *http.Request) {
362+
log.V(1).Infof("begin proc GetChunkHandler")
363+
364+
p, err := pfsmod.ParseChunkParam(r.URL.RawQuery)
385365
if err != nil {
386-
log.Error(err)
366+
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, response{})
387367
return
388368
}
389369

370+
user, resp := checkChunkAuthorization(p.Path, r.Header, "GetChunk")
371+
if resp.Err != "" {
372+
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp)
373+
return
374+
}
375+
log.Infof("user:%s download %s\n", user, p.String())
376+
377+
err = getChunk(w, r, p)
378+
if err != nil {
379+
if err.Error() != pfsmod.StatusFileEOF {
380+
log.Errorln("cmd %s error info:%s", p.String(), err)
381+
}
382+
resp.Err = err.Error()
383+
}
384+
385+
writeJSONResponse(w, r.URL.RawQuery, http.StatusOK, resp)
390386
log.V(1).Info("end proc GetChunkHandler")
391387
return
392388
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package pfsserver
2+
3+
import (
4+
"crypto/md5"
5+
"encoding/hex"
6+
"fmt"
7+
"io"
8+
"mime/multipart"
9+
"net/http/httptest"
10+
"reflect"
11+
"testing"
12+
13+
pfsmod "github.com/PaddlePaddle/cloud/go/filemanager/pfsmodules"
14+
)
15+
16+
func TestGetChunk(t *testing.T) {
17+
param := pfsmod.ChunkParam{}
18+
param.Path = "./testdata/test_lt_chunk.dat"
19+
param.Offset = 0
20+
param.Size = int64(4096)
21+
22+
req := httptest.NewRequest("GET", "http://example.com/foo", nil)
23+
w := httptest.NewRecorder()
24+
err := getChunk(w, req, &param)
25+
if err == nil ||
26+
err.Error() != pfsmod.StatusFileEOF {
27+
t.Error(err)
28+
}
29+
30+
fmt.Println(w)
31+
32+
partReader := multipart.NewReader(w.Body, pfsmod.DefaultMultiPartBoundary)
33+
for {
34+
part, err := partReader.NextPart()
35+
if err == io.EOF {
36+
break
37+
}
38+
39+
if part.FormName() != "chunk" {
40+
continue
41+
}
42+
43+
m1, err := pfsmod.ParseChunkParam(part.FileName())
44+
if err != nil {
45+
t.Error(err)
46+
}
47+
48+
if reflect.DeepEqual(m1, param) {
49+
t.Error("m1 != param, m1:%v", m1)
50+
}
51+
52+
data := make([]byte, m1.Size)
53+
n, err := io.ReadFull(part, data)
54+
if err != io.ErrUnexpectedEOF {
55+
t.Error(err)
56+
}
57+
58+
sum := md5.Sum(data[:n])
59+
checksum := hex.EncodeToString(sum[:])
60+
if checksum != "77c58f04583c86f78c51df158e3f35e8" {
61+
t.Error("checksum error")
62+
}
63+
}
64+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
1
2+
2
3+
3
4+
4
5+
5
6+
6
7+
7

0 commit comments

Comments
 (0)