Skip to content

Commit 1f5bc6d

Browse files
committed
large-file-upload: lotsa refactor and cleanup
keep track of the low-water-mark file offset so it's easier to resume slightly change the resume command line argument format split things into more functions & structs
1 parent 1a15bc8 commit 1f5bc6d

File tree

1 file changed

+196
-74
lines changed

1 file changed

+196
-74
lines changed

examples/large-file-upload.rs

Lines changed: 196 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use dropbox_sdk::files;
88
use dropbox_sdk::default_client::{NoauthDefaultClient, UserAuthDefaultClient};
99
use dropbox_sdk::oauth2::{oauth2_token_from_authorization_code, Oauth2AuthorizeUrlBuilder,
1010
Oauth2Type};
11-
11+
use std::collections::HashMap;
1212
use std::fs::File;
1313
use std::path::{Path, PathBuf};
1414
use std::io::{self, Write, Seek, SeekFrom};
1515
use std::process::exit;
16-
use std::sync::Arc;
16+
use std::sync::{Arc, Mutex};
1717
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
1818
use std::thread::sleep;
1919
use std::time::{Duration, Instant, SystemTime};
@@ -85,29 +85,38 @@ struct Resume {
8585
session_id: String,
8686
}
8787

88+
impl std::str::FromStr for Resume {
89+
type Err = &'static str;
90+
fn from_str(s: &str) -> Result<Self, Self::Err> {
91+
let mut parts = s.rsplitn(2, ',');
92+
let offset_str = parts.next().ok_or("missing session ID and file offset")?;
93+
let session_id = parts.next().ok_or("missing file offset")?.to_owned();
94+
let start_offset = offset_str.parse().map_err(|_| "invalid file offset")?;
95+
Ok(Self { start_offset, session_id })
96+
}
97+
}
98+
8899
fn parse_args() -> Operation {
89100
let mut a = std::env::args().skip(1);
90101
match (a.next(), a.next()) {
91102
(Some(ref arg), _) if arg == "--help" || arg == "-h" => {
92103
Operation::Usage
93104
}
94105
(Some(src), Some(dest)) => {
95-
let resume = match (a.next(), a.next()) {
96-
(Some(start_offset_str), Some(session_id)) => {
97-
match start_offset_str.parse::<u64>() {
98-
Ok(start_offset) => Some(Resume { start_offset, session_id }),
106+
let resume = match (a.next().as_deref(), a.next()) {
107+
(Some("--resume"), Some(resume_str)) => {
108+
match resume_str.parse() {
109+
Ok(resume) => Some(resume),
99110
Err(e) => {
100-
eprintln!("Invalid start offset: {}", e);
101-
eprintln!("Usage: <source> <dest> <start offset> <session ID>");
102-
None
111+
eprintln!("Invalid --resume argument: {}", e);
112+
return Operation::Usage;
103113
}
104114
}
105115
}
106-
(Some(_), None) => {
107-
eprintln!("Usage: <source> <dest> <start offset> <session ID>");
108-
None
116+
(None, _) => None,
117+
_ => {
118+
return Operation::Usage;
109119
}
110-
_ => None,
111120
};
112121
Operation::Upload(Args {
113122
source_path: PathBuf::from(src),
@@ -196,83 +205,189 @@ fn get_destination_path(client: &UserAuthDefaultClient, given_path: &str, source
196205
}
197206
}
198207

208+
/// Keep track of some shared state accessed / updated by various parts of the uploading process.
209+
struct UploadSession {
210+
session_id: String,
211+
start_offset: u64,
212+
file_size: u64,
213+
bytes_transferred: AtomicU64,
214+
completion: Mutex<CompletionTracker>,
215+
}
216+
217+
impl UploadSession {
218+
/// Make a new upload session.
219+
pub fn new(client: &UserAuthDefaultClient, file_size: u64) -> Result<Self, String> {
220+
let session_id = match files::upload_session_start(
221+
client,
222+
&files::UploadSessionStartArg::default()
223+
.with_session_type(files::UploadSessionType::Concurrent),
224+
&[],
225+
) {
226+
Ok(Ok(result)) => result.session_id,
227+
error => return Err(format!("Starting upload session failed: {:?}", error)),
228+
};
229+
230+
Ok(Self {
231+
session_id,
232+
start_offset: 0,
233+
file_size,
234+
bytes_transferred: AtomicU64::new(0),
235+
completion: Mutex::new(CompletionTracker::default()),
236+
})
237+
}
238+
239+
/// Resume a pre-existing (i.e. interrupted) upload session.
240+
pub fn resume(resume: Resume, file_size: u64) -> Self {
241+
Self {
242+
session_id: resume.session_id,
243+
start_offset: resume.start_offset,
244+
file_size,
245+
bytes_transferred: AtomicU64::new(0),
246+
completion: Mutex::new(CompletionTracker::resume_from(resume.start_offset)),
247+
}
248+
}
249+
250+
/// Generate the argument to append a block at the given offset.
251+
pub fn append_arg(&self, block_offset: u64) -> files::UploadSessionAppendArg {
252+
files::UploadSessionAppendArg::new(
253+
files::UploadSessionCursor::new(
254+
self.session_id.clone(),
255+
self.start_offset + block_offset))
256+
}
257+
258+
/// Generate the argument to commit the upload at the given path with the given modification
259+
/// time.
260+
pub fn commit_arg(&self, dest_path: String, source_mtime: SystemTime)
261+
-> files::UploadSessionFinishArg
262+
{
263+
files::UploadSessionFinishArg::new(
264+
files::UploadSessionCursor::new(
265+
self.session_id.clone(),
266+
self.file_size),
267+
files::CommitInfo::new(dest_path)
268+
.with_client_modified(iso8601(source_mtime)))
269+
}
270+
271+
/// Mark a block as uploaded.
272+
pub fn mark_block_uploaded(&self, block_offset: u64, block_len: u64) {
273+
let mut completion = self.completion.lock().unwrap();
274+
completion.complete_block(block_offset, block_len);
275+
}
276+
277+
/// Return the offset up to which the file is completely uploaded. It can be resumed from this
278+
/// position if something goes wrong.
279+
pub fn complete_up_to(&self) -> u64 {
280+
let completion = self.completion.lock().unwrap();
281+
completion.complete_up_to
282+
}
283+
}
284+
285+
/// Because blocks can be uploaded out of order, if an error is encountered when uploading a given
286+
/// block, that is not necessarily the correct place to resume uploading from next time: there may
287+
/// be gaps before that block.
288+
///
289+
/// This struct is for keeping track of what offset the file has been completely uploaded to.
290+
///
291+
/// When a block is finished uploading, call `complete_block` with the offset and length.
292+
#[derive(Default)]
293+
struct CompletionTracker {
294+
complete_up_to: u64,
295+
uploaded_blocks: HashMap<u64, u64>,
296+
}
297+
298+
impl CompletionTracker {
299+
/// Make a new CompletionTracker that assumes everything up to the given offset is complete. Use
300+
/// this if resuming a previously interrupted session.
301+
pub fn resume_from(complete_up_to: u64) -> Self {
302+
Self {
303+
complete_up_to,
304+
uploaded_blocks: HashMap::new(),
305+
}
306+
}
307+
308+
/// Mark a block as completely uploaded.
309+
pub fn complete_block(&mut self, block_offset: u64, block_len: u64) {
310+
if block_offset == self.complete_up_to {
311+
// Advance the cursor.
312+
self.complete_up_to += block_len;
313+
314+
// Also look if we can advance it further still.
315+
loop {
316+
let key = self.complete_up_to;
317+
if let Some(len) = self.uploaded_blocks.remove(&key) {
318+
self.complete_up_to += len;
319+
} else {
320+
break;
321+
}
322+
}
323+
} else {
324+
// This block isn't at the low-water mark; there's a gap behind it. Save it for later.
325+
self.uploaded_blocks.insert(block_offset, block_len);
326+
}
327+
}
328+
}
329+
330+
fn get_file_mtime_and_size(f: &File) -> Result<(SystemTime, u64), String> {
331+
let meta = f.metadata().map_err(|e| format!("Error getting source file metadata: {}", e))?;
332+
let mtime = meta.modified().map_err(|e| format!("Error getting source file mtime: {}", e))?;
333+
Ok((mtime, meta.len()))
334+
}
335+
199336
fn upload_file(
200337
client: Arc<UserAuthDefaultClient>,
201338
mut source_file: File,
202339
dest_path: String,
203340
resume: Option<Resume>,
204341
) -> Result<(), String> {
205342

206-
let (source_mtime, source_len) = source_file.metadata()
207-
.and_then(|meta| meta.modified().map(|mtime| (mtime, meta.len())))
208-
.map_err(|e| {
209-
format!("Error getting source file metadata: {}", e)
210-
})?;
343+
let (source_mtime, source_len) = get_file_mtime_and_size(&source_file)?;
211344

212-
let cursor = if let Some(resume) = resume {
213-
eprintln!("Resuming upload: {:?}", resume);
345+
let session = Arc::new(if let Some(resume) = resume {
214346
source_file.seek(SeekFrom::Start(resume.start_offset))
215347
.map_err(|e| format!("Seek error: {}", e))?;
216-
files::UploadSessionCursor::new(resume.session_id, resume.start_offset)
348+
UploadSession::resume(resume, source_len)
217349
} else {
218-
let sesid = match files::upload_session_start(
219-
client.as_ref(),
220-
&files::UploadSessionStartArg::default()
221-
.with_session_type(files::UploadSessionType::Concurrent),
222-
&[])
223-
{
224-
Ok(Ok(result)) => result.session_id,
225-
error => {
226-
return Err(format!("Starting upload session failed: {:?}", error));
227-
}
228-
};
229-
230-
files::UploadSessionCursor::new(sesid, 0)
231-
};
232-
233-
eprintln!("upload session ID is {}", cursor.session_id);
350+
UploadSession::new(client.as_ref(), source_len)?
351+
});
234352

235-
let overall_start = Instant::now();
236-
let bytes_sofar = Arc::new(AtomicU64::new(0));
353+
eprintln!("upload session ID is {}", session.session_id);
237354

238-
{
355+
let start_time = Instant::now();
356+
let upload_result = {
239357
let client = client.clone();
240-
let session_id = Arc::new(cursor.session_id.clone());
241-
let start_offset = cursor.offset;
242-
if let Err(e) = parallel_reader::read_stream_and_process_chunks_in_parallel(
358+
let session = session.clone();
359+
parallel_reader::read_stream_and_process_chunks_in_parallel(
243360
&mut source_file,
244361
BLOCK_SIZE,
245362
PARALLELISM,
246363
Arc::new(move |block_offset, data: &[u8]| -> Result<(), String> {
247-
let cursor = files::UploadSessionCursor::new(
248-
(*session_id).clone(),
249-
start_offset + block_offset);
250-
let mut append_arg = files::UploadSessionAppendArg::new(cursor);
364+
let mut append_arg = session.append_arg(block_offset);
251365
if data.len() != BLOCK_SIZE {
252366
// This must be the last block. Only the last one is allowed to be not 4 MiB
253367
// exactly, so let's close the session.
254368
append_arg.close = true;
255369
}
256-
upload_chunk_with_retry(
370+
let result = upload_chunk_with_retry(
257371
client.as_ref(),
258372
&append_arg,
259373
data,
260-
overall_start,
261-
bytes_sofar.as_ref(),
262-
source_len - start_offset,
263-
PARALLELISM as u64,
264-
)
374+
start_time,
375+
session.as_ref(),
376+
);
377+
if result.is_ok() {
378+
session.mark_block_uploaded(block_offset, data.len() as u64);
379+
}
380+
result
265381
}))
266-
{
267-
return Err(e.to_string());
268-
}
382+
};
383+
384+
if let Err(e) = upload_result {
385+
return Err(format!("{}. To resume, use --resume {},{}",
386+
e, session.session_id, session.complete_up_to()));
269387
}
270388

271389
eprintln!("committing...");
272-
let finish = files::UploadSessionFinishArg::new(
273-
cursor,
274-
files::CommitInfo::new(dest_path)
275-
.with_client_modified(iso8601(source_mtime)));
390+
let finish = session.commit_arg(dest_path, source_mtime);
276391

277392
let mut retry = 0;
278393
while retry < 3 {
@@ -290,19 +405,18 @@ fn upload_file(
290405
}
291406
}
292407

293-
Err("Upload failed.".to_owned())
408+
Err(format!("Upload failed. To retry, use --resume {},{}",
409+
session.session_id, session.complete_up_to()))
294410
}
295411

296412
fn upload_chunk_with_retry(
297413
client: &UserAuthDefaultClient,
298414
arg: &files::UploadSessionAppendArg,
299415
buf: &[u8],
300-
overall_start: Instant,
301-
bytes_sofar: &AtomicU64,
302-
total_bytes: u64,
303-
parallelism: u64,
416+
start_time: Instant,
417+
session: &UploadSession,
304418
) -> Result<(), String> {
305-
let chunk_start = Instant::now();
419+
let block_start_time = Instant::now();
306420
let mut errors = 0;
307421
loop {
308422
match files::upload_session_append_v2(client, arg, buf) {
@@ -322,17 +436,25 @@ fn upload_chunk_with_retry(
322436
}
323437

324438
let now = Instant::now();
325-
let chunk_time = now.duration_since(chunk_start);
326-
let overall_time = now.duration_since(overall_start);
439+
let block_dur = now.duration_since(block_start_time);
440+
let overall_dur = now.duration_since(start_time);
441+
442+
let block_bytes = buf.len() as u64;
443+
let bytes_sofar = session.bytes_transferred.fetch_add(block_bytes, SeqCst) + block_bytes;
444+
445+
let percent = bytes_sofar as f64 / session.file_size as f64 * 100.;
446+
447+
// This assumes that we have `PARALLELISM` uploads going at the same time and at roughly the
448+
// same upload speed:
449+
let block_rate = block_bytes as f64 / block_dur.as_secs_f64() * PARALLELISM as f64;
327450

328-
let chunk_bytes = buf.len() as u64;
329-
let bytes_sofar = bytes_sofar.fetch_add(chunk_bytes, SeqCst) + chunk_bytes;
451+
let overall_rate = bytes_sofar as f64 / overall_dur.as_secs_f64();
330452

331453
eprintln!("{:.01}%: {}Bytes uploaded, {}Bytes per second, {}Bytes per second average",
332-
bytes_sofar as f64 / total_bytes as f64 * 100.,
454+
percent,
333455
human_number(bytes_sofar),
334-
human_number((chunk_bytes as f64 / chunk_time.as_secs_f64() * parallelism as f64) as u64),
335-
human_number((bytes_sofar as f64 / overall_time.as_secs_f64()) as u64),
456+
human_number(block_rate as u64),
457+
human_number(overall_rate as u64),
336458
);
337459

338460
Ok(())
@@ -343,7 +465,7 @@ fn main() {
343465

344466
let args = match parse_args() {
345467
Operation::Usage => {
346-
fatal!("usage: {} <source> <Dropbox destination> [<resume offset> <resume session ID>]",
468+
fatal!("usage: {} <source> <Dropbox destination> [--resume <session ID>,<resume offset>]",
347469
std::env::args().next().unwrap());
348470
}
349471
Operation::Upload(args) => args,

0 commit comments

Comments
 (0)