Skip to content

Commit a971628

Browse files
committed
examples/large-file-upload: better session-closing logic
Do it in such a way that better enables resumes on errors. Also, the "different contents" code was in error, I think, so removing that.
1 parent f1ef3ac commit a971628

File tree

1 file changed

+40
-13
lines changed

1 file changed

+40
-13
lines changed

examples/large-file-upload.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ const PARALLELISM: usize = 20;
2424
/// The size of a block. This is a Dropbox constant, not adjustable.
2525
const BLOCK_SIZE: usize = 4 * 1024 * 1024;
2626

27+
/// We can upload an integer multiple of BLOCK_SIZE in a single request. This reduces the number of
28+
/// requests needed to do the upload and can help avoid running into rate limits.
29+
const BLOCKS_PER_REQUEST: usize = 2;
30+
2731
macro_rules! fatal {
2832
($($arg:tt)*) => {
2933
eprintln!($($arg)*);
@@ -322,20 +326,31 @@ fn upload_file(
322326

323327
eprintln!("upload session ID is {}", session.session_id);
324328

329+
// Initially set to the end of the file and an empty block; if the file is an exact multiple of
330+
// BLOCK_SIZE, we'll need to upload an empty buffer when closing the session.
331+
let last_block = Arc::new(Mutex::new((source_len, vec![])));
332+
325333
let start_time = Instant::now();
326334
let upload_result = {
327335
let client = client.clone();
328336
let session = session.clone();
337+
let last_block = last_block.clone();
338+
let resume = resume.clone();
329339
parallel_reader::read_stream_and_process_chunks_in_parallel(
330340
&mut source_file,
331-
BLOCK_SIZE,
341+
BLOCK_SIZE * BLOCKS_PER_REQUEST,
332342
PARALLELISM,
333343
Arc::new(move |block_offset, data: &[u8]| -> Result<(), String> {
334-
let mut append_arg = session.append_arg(block_offset);
335-
if data.len() != BLOCK_SIZE {
344+
let append_arg = session.append_arg(block_offset);
345+
if data.len() != BLOCK_SIZE * BLOCKS_PER_REQUEST {
336346
// This must be the last block. Only the last one is allowed to be not 4 MiB
337-
// exactly, so let's close the session.
338-
append_arg.close = true;
347+
// exactly. Save the block and offset so it can be uploaded after all the
348+
// parallel uploads are done. This is because once the session is closed, we
349+
// can't resume it.
350+
let mut last_block = last_block.lock().unwrap();
351+
last_block.0 = block_offset + session.start_offset;
352+
last_block.1 = data.to_vec();
353+
return Ok(());
339354
}
340355
let result = upload_block_with_retry(
341356
client.as_ref(),
@@ -357,6 +372,19 @@ fn upload_file(
357372
e, session.session_id, session.complete_up_to()));
358373
}
359374

375+
let (last_block_offset, last_block_data) = unwrap_arcmutex(last_block);
376+
eprintln!("closing session at {} with {}-byte block",
377+
last_block_offset, last_block_data.len());
378+
let mut arg = session.append_arg(last_block_offset);
379+
arg.close = true;
380+
if let Err(e) = upload_block_with_retry(
381+
client.as_ref(), &arg, &last_block_data, start_time, session.as_ref(), resume.as_ref())
382+
{
383+
eprintln!("failed to close session: {}", e);
384+
// But don't error out; try committing anyway. It could be we're resuming a file where we
385+
// already closed it out but failed to commit.
386+
}
387+
360388
eprintln!("committing...");
361389
let finish = session.commit_arg(dest_path, source_mtime);
362390

@@ -393,17 +421,9 @@ fn upload_block_with_retry(
393421
) -> Result<(), String> {
394422
let block_start_time = Instant::now();
395423
let mut errors = 0;
396-
const BLOCK_UPLOADED_MSG: &str = "Different data already uploaded at offset ";
397424
loop {
398425
match files::upload_session_append_v2(client, arg, buf) {
399426
Ok(Ok(())) => { break; }
400-
Err(dropbox_sdk::Error::BadRequest(msg))
401-
if resume.is_some() && msg.contains(BLOCK_UPLOADED_MSG) =>
402-
{
403-
let i = msg.find(BLOCK_UPLOADED_MSG).unwrap();
404-
eprintln!("already uploaded block at {}", &msg[i + BLOCK_UPLOADED_MSG.len() ..]);
405-
return Ok(());
406-
}
407427
Err(dropbox_sdk::Error::RateLimited { reason, retry_after_seconds }) => {
408428
eprintln!("rate-limited ({}), waiting {} seconds", reason, retry_after_seconds);
409429
if retry_after_seconds > 0 {
@@ -484,6 +504,13 @@ fn iso8601(t: SystemTime) -> String {
484504
.format("%Y-%m-%dT%H:%M:%SZ").to_string()
485505
}
486506

507+
fn unwrap_arcmutex<T: std::fmt::Debug>(x: Arc<Mutex<T>>) -> T {
508+
Arc::try_unwrap(x)
509+
.expect("failed to unwrap Arc")
510+
.into_inner()
511+
.expect("failed to unwrap Mutex")
512+
}
513+
487514
fn main() {
488515
env_logger::init();
489516

0 commit comments

Comments
 (0)