有个业务需求是这样的。需要上传1~5张图片,使用一个专门的上传图片的网址。使用这个网址上传一张图片后会返回一个该图片的url。多张图片上传后可以获取多个url,再把这些图片url加上一些其他参数,发起一个业务请求。
这应该算一个很常见的产品需求。一开始我想着怎么没有可以上传多个图片的后台接口提供。后面想一次性上传多个,弱网环境下,或者总上传文件体积太大的情况下,一个文件上传失败其他文件也都失败了。所以一次性只能上传一个也还算合理。
如果网络请求是使用了 Retrofit + Rxjava 那一套,那使用 Rxjava 的 zip 操作符就显得尤其方便了。
zip的基础写法如下。有 N+1 个参数。 N是并发任务数量。这里并发任务都是相同的话还可以抽取成一个 uploadImageFile 方法以便复用。 1是N个并发执行完后,需要做一些处理操作的函数。
下面5个并发任务 observable0、observable1、observable2、observable3、observable4 有1个处理函数 new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List>() {…}
private void button4(List<UploadImageItem> uploadImageItemList) { Observable<UploadImageItem> observable0; Observable<UploadImageItem> observable1; Observable<UploadImageItem> observable2; Observable<UploadImageItem> observable3; Observable<UploadImageItem> observable4; observable0 = uploadImageFile(uploadImageItemList.get(0)); observable1 = uploadImageFile(uploadImageItemList.get(1)); observable2 = uploadImageFile(uploadImageItemList.get(2)); observable3 = uploadImageFile(uploadImageItemList.get(3)); observable4 = uploadImageFile(uploadImageItemList.get(4)); Observable.zip(observable0, observable1, observable2, observable3, observable4, new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List<UploadImageItem>>() { @Override public List<UploadImageItem> apply(UploadImageItem uploadImageItem, UploadImageItem uploadImageItem2, UploadImageItem uploadImageItem3, UploadImageItem uploadImageItem4, UploadImageItem uploadImageItem5) throws Exception { return uploadImageItemList; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<UploadImageItem>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<UploadImageItem> uploadImageItems) { for (UploadImageItem uploadImageItem : uploadImageItems) { Log.e(TAG, "MultiTaskActivity.java - run() ----- 打印实体:" + uploadImageItem); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } public Observable<UploadImageItem> uploadImageFile(UploadImageItem item) { return Observable.just(item)//这里应该写成网络请求 .subscribeOn(Schedulers.io()) .map(response -> { SystemClock.sleep(new Random().nextInt(1000) + 1000); item.urlPath = "url path from " + item.filePath; Log.e(TAG, "MultiTaskActivity.java - uploadImageFile() finish! item.filePath:" + item.filePath); return item; }); }以上是基础写法,但是我们图片数量是1~5张的。查看 zip 的重载方法是这样的。有从2个任务到9个任务的重载方法。 对于可变的任务数量,刚开始我是这么写的。实在有点不忍直视。 后面改了一版。主要思路是使用假任务补足任务数量到5个,然后实际执行时候对那些假任务跳过不执行就可以。
public final UploadImageItem EMPTY_ITEM = new UploadImageItem(); /** * 任务可变的,正确使用zip操作符的方法 */ private void button6(List<UploadImageItem> uploadImageItemList) { Observable<UploadImageItem> observable0; Observable<UploadImageItem> observable1; Observable<UploadImageItem> observable2; Observable<UploadImageItem> observable3; Observable<UploadImageItem> observable4; List<UploadImageItem> originUploadImageList = new ArrayList<>(uploadImageItemList); //添加null,补全到最多图片数量 int validCount = originUploadImageList.size();//原始数量值 for (int i = 0; i < CONCURRENCY_TASK_NUMBER - validCount; i++) { //伪装操作1:添加值为null的item。 originUploadImageList.add(null); } observable0 = uploadImageFileV2(originUploadImageList.get(0)); observable1 = uploadImageFileV2(originUploadImageList.get(1)); observable2 = uploadImageFileV2(originUploadImageList.get(2)); observable3 = uploadImageFileV2(originUploadImageList.get(3)); observable4 = uploadImageFileV2(originUploadImageList.get(4)); Observable<List<UploadImageItem>> observableUploadResult; //使用zip操作符并发上传图片。 Observable .zip(observable0, observable1, observable2, observable3, observable4, new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List<UploadImageItem>>() { @Override public List<UploadImageItem> apply(UploadImageItem uploadImageItem0, UploadImageItem uploadImageItem1, UploadImageItem uploadImageItem2, UploadImageItem uploadImageItem3, UploadImageItem uploadImageItem4) { List<UploadImageItem> resultUploadImageItemList = new ArrayList<>(); resultUploadImageItemList.add(uploadImageItem0); resultUploadImageItemList.add(uploadImageItem1); resultUploadImageItemList.add(uploadImageItem2); resultUploadImageItemList.add(uploadImageItem3); resultUploadImageItemList.add(uploadImageItem4); //伪装操作3:识别出EMPTY_ITEM,过滤掉。 while (resultUploadImageItemList.contains(EMPTY_ITEM)) { resultUploadImageItemList.remove(EMPTY_ITEM); } return resultUploadImageItemList; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<UploadImageItem>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<UploadImageItem> uploadImageItems) { for (UploadImageItem uploadImageItem : uploadImageItems) { Log.e(TAG, "MultiTaskActivity.java - run() ----- 打印实体:" + uploadImageItem); } } @Override public void onError(Throwable e) { // 如果任意一张图片上传出错,会走到这个方法里 } @Override public void onComplete() { } }); } public Observable<UploadImageItem> uploadImageFileV2(UploadImageItem item) { //伪装操作2:如果是null的item,不需要执行网络请求,而是发送一个EMPTY_ITEM。 //因为 Observable.just(null) 会报错,所以使用 EMPTY_ITEM 这个代表假任务 if (item == null) { return Observable.just(EMPTY_ITEM); } //图片已经上传过,已拥有网络路径 if (item.urlPath != null) { return Observable.just(item); } return Observable.just(item)//这里应该写成网络请求 .subscribeOn(Schedulers.io()) .map(response -> { SystemClock.sleep(new Random().nextInt(1000) + 1000); item.urlPath = "url path from " + item.filePath; Log.e(TAG, "MultiTaskActivity.java - uploadImageFile() finish! item.filePath:" + item.filePath); return item; }); }使用这种方法并发上传5张图片之后,完全可以使用 .flatMap 接着发起一个业务请求。 这么写就真的非常的流式编程了。代码如下
/** * 任务可变的,正确使用zip操作符的方法,接上业务数据请求 */ private void button7(List<UploadImageItem> uploadImageItemList) { Observable<UploadImageItem> observable0; Observable<UploadImageItem> observable1; Observable<UploadImageItem> observable2; Observable<UploadImageItem> observable3; Observable<UploadImageItem> observable4; List<UploadImageItem> originUploadImageList = new ArrayList<>(uploadImageItemList); //添加null,补全到最多图片数量 int validCount = originUploadImageList.size();//原始数量值 for (int i = 0; i < CONCURRENCY_TASK_NUMBER - validCount; i++) { //伪装操作1:添加值为null的item。 originUploadImageList.add(null); } observable0 = uploadImageFileV2(originUploadImageList.get(0)); observable1 = uploadImageFileV2(originUploadImageList.get(1)); observable2 = uploadImageFileV2(originUploadImageList.get(2)); observable3 = uploadImageFileV2(originUploadImageList.get(3)); observable4 = uploadImageFileV2(originUploadImageList.get(4)); Observable<List<UploadImageItem>> observableUploadResult; //使用zip操作符并发上传图片。 Observable .zip(observable0, observable1, observable2, observable3, observable4, new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List<UploadImageItem>>() { @Override public List<UploadImageItem> apply(UploadImageItem uploadImageItem0, UploadImageItem uploadImageItem1, UploadImageItem uploadImageItem2, UploadImageItem uploadImageItem3, UploadImageItem uploadImageItem4) { List<UploadImageItem> resultUploadImageItemList = new ArrayList<>(); resultUploadImageItemList.add(uploadImageItem0); resultUploadImageItemList.add(uploadImageItem1); resultUploadImageItemList.add(uploadImageItem2); resultUploadImageItemList.add(uploadImageItem3); resultUploadImageItemList.add(uploadImageItem4); //伪装操作3:识别出EMPTY_ITEM,过滤掉。 while (resultUploadImageItemList.contains(EMPTY_ITEM)) { resultUploadImageItemList.remove(EMPTY_ITEM); } return resultUploadImageItemList; } }) .flatMap(new Function<List<UploadImageItem>, Observable<HttpResult>>() { @Override public Observable<HttpResult> apply(List<UploadImageItem> uploadImageItems) throws Exception { RequestBody requestBody = new RequestBody(); requestBody.uploadImageItemList = uploadImageItemList; //上传完5张图片后,使用flatMap接着发一个业务请求。 return new ApiService().submitInfo(requestBody); } }) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<HttpResult>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(HttpResult httpResult) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } //假装这个一个Retrofit interface public class ApiService { @POST("users/new") Observable<HttpResult> submitInfo(@Body RequestBody requestBody) {return null;} } public static class RequestBody { List<UploadImageItem> uploadImageItemList; String otherParam1; int otherParam2; } public static class HttpResult { int ret; String msg; String data; }