温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Retrofit+Rxjava下载文件进度的示例分析

发布时间:2021-07-28 09:17:51 来源:亿速云 阅读:327 作者:小新 栏目:编程语言

这篇文章主要为大家展示了“Retrofit+Rxjava下载文件进度的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Retrofit+Rxjava下载文件进度的示例分析”这篇文章吧。

准备工作

本文采用Dagger2,Retrofit,RxJava。

compile'com.squareup.retrofit2:retrofit:2.0.2' compile'com.squareup.retrofit2:converter-gson:2.0.2' compile'com.squareup.retrofit2:adapter-rxjava:2.0.2' //dagger2 compile'com.google.dagger:dagger:2.6' apt'com.google.dagger:dagger-compiler:2.6' //RxJava compile'io.reactivex:rxandroid:1.2.0' compile'io.reactivex:rxjava:1.1.5' compile'com.jakewharton.rxbinding:rxbinding:0.4.0'

改造ResponseBody

okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody。

public class ProgressResponseBody extends ResponseBody {  private ResponseBody responseBody;  private BufferedSource bufferedSource;  public ProgressResponseBody(ResponseBody responseBody) {  this.responseBody = responseBody;  }  @Override  public MediaType contentType() {  return responseBody.contentType();  }  @Override  public long contentLength() {  return responseBody.contentLength();  }  @Override  public BufferedSource source() {  if (bufferedSource == null) {   bufferedSource = Okio.buffer(source(responseBody.source()));  }  return bufferedSource;  }  private Source source(Source source) {  return new ForwardingSource(source) {   long bytesReaded = 0;   @Override   public long read(Buffer sink, long byteCount) throws IOException {   long bytesRead = super.read(sink, byteCount);   bytesReaded += bytesRead == -1 ? 0 : bytesRead;   //实时发送当前已读取的字节和总字节   RxBus.getInstance().post(new FileLoadEvent(contentLength(), bytesReaded));   return bytesRead;   }  };  } }

呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象。

FileLoadEvent

FileLoadEvent很简单,包含了当前已加载进度和文件总大小。

public class FileLoadEvent {  long total;  long bytesLoaded;  public long getBytesLoaded() {  return bytesLoaded;  }  public long getTotal() {  return total;  }  public FileLoadEvent(long total, long bytesLoaded) {  this.total = total;  this.bytesLoaded = bytesLoaded;  } }

RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情。

public class RxBus {  private static volatile RxBus mInstance;  private SerializedSubject<Object, Object> mSubject;  private HashMap<String, CompositeSubscription> mSubscriptionMap;  /**  * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者  * Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,  * 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。  */  private RxBus() {  mSubject = new SerializedSubject<>(PublishSubject.create());  }  /**  * 单例 双重锁  * @return  */  public static RxBus getInstance() {  if (mInstance == null) {   synchronized (RxBus.class) {   if (mInstance == null) {    mInstance = new RxBus();   }   }  }  return mInstance;  }  /**  * 发送一个新的事件  * @param o  */  public void post(Object o) {  mSubject.onNext(o);  }  /**  * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者  * @param type  * @param <T>  * @return  */  public <T> Observable<T> tObservable(final Class<T> type) {  //ofType操作符只发射指定类型的数据,其内部就是filter+cast  return mSubject.ofType(type);  }  public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {  return tObservable(type)   .onBackpressureBuffer()   .subscribeOn(Schedulers.io())   .observeOn(AndroidSchedulers.mainThread())   .subscribe(next, error);  }  public void addSubscription(Object o, Subscription subscription) {  if (mSubscriptionMap == null) {   mSubscriptionMap = new HashMap<>();  }  String key = o.getClass().getName();  if (mSubscriptionMap.get(key) != null) {   mSubscriptionMap.get(key).add(subscription);  } else {   CompositeSubscription compositeSubscription = new CompositeSubscription();   compositeSubscription.add(subscription);   mSubscriptionMap.put(key, compositeSubscription);   // Log.e("air", "addSubscription:订阅成功 " );  }  }  public void unSubscribe(Object o) {  if (mSubscriptionMap == null) {   return;  }  String key = o.getClass().getName();  if (!mSubscriptionMap.containsKey(key)) {   return;  }  if (mSubscriptionMap.get(key) != null) {   mSubscriptionMap.get(key).unsubscribe();  }  mSubscriptionMap.remove(key);  //Log.e("air", "unSubscribe: 取消订阅" );  } }

FileCallBack

那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了。

public abstract class FileCallBack<T> {  private String destFileDir;  private String destFileName;  public FileCallBack(String destFileDir, String destFileName) {  this.destFileDir = destFileDir;  this.destFileName = destFileName;  subscribeLoadProgress();  }  public abstract void onSuccess(T t);  public abstract void progress(long progress, long total);  public abstract void onStart();  public abstract void onCompleted();  public abstract void onError(Throwable e);  public void saveFile(ResponseBody body) {  InputStream is = null;  byte[] buf = new byte[2048];  int len;  FileOutputStream fos = null;  try {   is = body.byteStream();   File dir = new File(destFileDir);   if (!dir.exists()) {   dir.mkdirs();   }   File file = new File(dir, destFileName);   fos = new FileOutputStream(file);   while ((len = is.read(buf)) != -1) {   fos.write(buf, 0, len);   }   fos.flush();   unsubscribe();   //onCompleted();  } catch (FileNotFoundException e) {   e.printStackTrace();  } catch (IOException e) {   e.printStackTrace();  } finally {   try {   if (is != null) is.close();   if (fos != null) fos.close();   } catch (IOException e) {   Log.e("saveFile", e.getMessage());   }  }  }  /**  * 订阅加载的进度条  */  public void subscribeLoadProgress() {  Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent.class, new Action1<FileLoadEvent>() {   @Override   public void call(FileLoadEvent fileLoadEvent) {   progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal());   }  }, new Action1<Throwable>() {   @Override   public void call(Throwable throwable) {   //TODO 对异常的处理   }  });  RxBus.getInstance().addSubscription(this, subscription);  }  /**  * 取消订阅,防止内存泄漏  */  public void unsubscribe() {  RxBus.getInstance().unSubscribe(this);  } }

开始下载

使用自己的ProgressResponseBody

通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度。

public class ProgressInterceptor implements Interceptor {  @Override  public Response intercept(Chain chain) throws IOException {  Response originalResponse = chain.proceed(chain.request());  return originalResponse.newBuilder()   .body(new ProgressResponseBody(originalResponse.body()))   .build();  } }

构建Retrofit

@Module public class ApiModule {  @Provides  @Singleton  public OkHttpClient provideClient() {  OkHttpClient client = new OkHttpClient.Builder()   .addInterceptor(new ProgressInterceptor())   .build();  return client;  }  @Provides  @Singleton  public Retrofit provideRetrofit(OkHttpClient client){  Retrofit retrofit = new Retrofit.Builder()   .client(client)   .baseUrl(Constant.HOST)   .addCallAdapterFactory(RxJavaCallAdapterFactory.create())   .addConverterFactory(GsonConverterFactory.create())   .build();  return retrofit;  }  @Provides  @Singleton  public ApiInfo provideApiInfo(Retrofit retrofit){  return retrofit.create(ApiInfo.class);  }  @Provides  @Singleton  public ApiManager provideApiManager(Application application, ApiInfo apiInfo){  return new ApiManager(application,apiInfo);  } }

请求接口

public interface ApiInfo {  @Streaming  @GET  Observable<ResponseBody> download(@Url String url); }

执行请求

public void load(String url, final FileCallBack<ResponseBody> callBack){  apiInfo.download(url)   .subscribeOn(Schedulers.io())//请求网络 在调度者的io线程   .observeOn(Schedulers.io()) //指定线程保存文件   .doOnNext(new Action1<ResponseBody>() {    @Override    public void call(ResponseBody body) {    callBack.saveFile(body);    }   })   .observeOn(AndroidSchedulers.mainThread()) //在主线程中更新ui   .subscribe(new FileSubscriber<ResponseBody>(application,callBack));  }

在presenter层中执行网络请求。

通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作。

public void load(String url){  String fileName = "app.apk";  String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath();  Log.e(TAG, "load: "+fileStoreDir.toString() );  FileCallBack<ResponseBody> callBack = new FileCallBack<ResponseBody>(fileStoreDir,fileName) {   @Override   public void onSuccess(final ResponseBody responseBody) {   Toast.makeText(App.getInstance(),"下载文件成功",Toast.LENGTH_SHORT).show();   }   @Override   public void progress(long progress, long total) {   iHomeView.update(total,progress);   }   @Override   public void onStart() {   iHomeView.showLoading();   }   @Override   public void onCompleted() {   iHomeView.hideLoading();   }   @Override   public void onError(Throwable e) {   //TODO: 对异常的一些处理   e.printStackTrace();   }  };  apiManager.load(url, callBack);  }

以上是“Retrofit+Rxjava下载文件进度的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI