并发任务执行的几种方法 (同时上传多张图片请求)

    技术2026-04-04  11

    业务需求

    有个业务需求是这样的。需要上传1~5张图片,使用一个专门的上传图片的网址。使用这个网址上传一张图片后会返回一个该图片的url。多张图片上传后可以获取多个url,再把这些图片url加上一些其他参数,发起一个业务请求。

    这应该算一个很常见的产品需求。一开始我想着怎么没有可以上传多个图片的后台接口提供。后面想一次性上传多个,弱网环境下,或者总上传文件体积太大的情况下,一个文件上传失败其他文件也都失败了。所以一次性只能上传一个也还算合理。

    照片实体类
    public class UploadImageItem { @SerializedName("url_path") public String urlPath;//网络图片路径 @SerializedName("timestamp") public long timestamp; @SerializedName("location") public String location; public transient String filePath;//本地图片路径 public UploadImageItem() { } public UploadImageItem(String filePath, long timestamp, String location) { this.timestamp = timestamp; this.location = location; this.filePath = filePath; } @Override public String toString() { return "UploadImageItem{" + "urlPath='" + urlPath + '\'' + ", timestamp=" + timestamp + ", location='" + location + '\'' + ", filePath='" + filePath + '\'' + '}'; } }

    方法1.使用线程池、Runnable、final变量当计数器

    public static final int CONCURRENCY_TASK_NUMBER = 5;//并发任务数量 private ExecutorService mExecutorService = Executors.newCachedThreadPool(); /** * 使用线程池、Runnable、final变量当计数器 */ private void button1(List<UploadImageItem> uploadImageItemList) { final int[] finishCount = {0}; for (UploadImageItem uploadImageItem : uploadImageItemList) { mExecutorService.submit(new IoRunnableV1(uploadImageItem, new Callback() { @Override public void onCallback() { finishCount[0]++; if (finishCount[0] == uploadImageItemList.size()) { //任务全部完成了 print(uploadImageItemList); } } })); } } /** * 全部完成,打印结果 */ public static void print(List<UploadImageItem> uploadImageItemList) { Log.e(TAG, "ConcurrentTaskActivity.java - print() ----- 任务全部完成了:" + Thread.currentThread()); for (UploadImageItem uploadImageItem : uploadImageItemList) { Log.e(TAG, "ConcurrentTaskActivity.java - print() ----- 打印实体:" + uploadImageItem); } } private static class IoRunnableV1 implements Runnable { public UploadImageItem mUploadImageItem; public Callback mCallback; public IoRunnableV1(@NonNull UploadImageItem uploadImageItem, @NonNull Callback callback) { mUploadImageItem = uploadImageItem; mCallback = callback; } @Override public void run() { SystemClock.sleep(new Random().nextInt(1000) + 1000); mUploadImageItem.urlPath = "url path from " + mUploadImageItem.filePath; Log.e(TAG, "IoRunnableV1.java - run() ----- finish! item.filePath:" + mUploadImageItem.filePath); mCallback.onCallback(); } } interface Callback { void onCallback(); }

    方法2.使用线程池、Runnable、CountDownLatch当计数器

    private void button2(List<UploadImageItem> uploadImageItemList) { mExecutorService.execute(new Runnable() { @Override public void run() { CountDownLatch countDownLatch = new CountDownLatch(CONCURRENCY_TASK_NUMBER); for (UploadImageItem uploadImageItem : uploadImageItemList) { mExecutorService.submit(new IoRunnableV2(uploadImageItem, countDownLatch)); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //任务全部完成了 print(uploadImageItemList); } }); } private static class IoRunnableV2 implements Runnable { public UploadImageItem mUploadImageItem; public CountDownLatch mCountDownLatch; public IoRunnableV2(@NonNull UploadImageItem uploadImageItem, @NonNull CountDownLatch countDownLatch) { mUploadImageItem = uploadImageItem; mCountDownLatch = countDownLatch; } @Override public void run() { SystemClock.sleep(new Random().nextInt(1000) + 1000); mUploadImageItem.urlPath = "url path from " + mUploadImageItem.filePath; Log.e(TAG, "IoRunnableV2.java - run() ----- finish! item.filePath:" + mUploadImageItem.filePath); mCountDownLatch.countDown(); } }

    方法3.使用线程池、Callable、使用Future同步获取结果

    private void button3(List<UploadImageItem> uploadImageItemList) { mExecutorService.execute(new Runnable() { @Override public void run() { List<Future<UploadImageItem>> futureList = new ArrayList<>(); for (UploadImageItem uploadImageItem : uploadImageItemList) { Future<UploadImageItem> future = mExecutorService.submit(new IoCallable(uploadImageItem)); futureList.add(future); } try { for (Future<UploadImageItem> future : futureList) { future.get(); } } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } print(uploadImageItemList); } }); } private static class IoCallable implements Callable<UploadImageItem> { public UploadImageItem mUploadImageItem; public IoCallable(UploadImageItem uploadImageItem) { this.mUploadImageItem = uploadImageItem; } @Override public UploadImageItem call() { SystemClock.sleep(new Random().nextInt(1000) + 1000);//模拟网络请求 mUploadImageItem.urlPath = "url path from " + mUploadImageItem.filePath; Log.e(TAG, "IoCallable.java - run() ----- finish! item.filePath::" + mUploadImageItem.filePath); return mUploadImageItem; } }

    方法4.使用Rxjava 的 zip 操作符

    如果网络请求是使用了 Retrofit + Rxjava 那一套,那使用 Rxjava 的 zip 操作符就显得尤其方便了。

    zip的基础写法如下。有 N+1 个参数。 N是并发任务数量。这里并发任务都是相同的话还可以抽取成一个 uploadImageFile 方法以便复用。 1是N个并发执行完后,需要做一些处理操作的函数。

    下面5个并发任务 observable0、observable1、observable2、observable3、observable4 有1个处理函数 new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List>() {…}

    private void button4(List<UploadImageItem> uploadImageItemList) { Observable<UploadImageItem> observable0; Observable<UploadImageItem> observable1; Observable<UploadImageItem> observable2; Observable<UploadImageItem> observable3; Observable<UploadImageItem> observable4; observable0 = uploadImageFile(uploadImageItemList.get(0)); observable1 = uploadImageFile(uploadImageItemList.get(1)); observable2 = uploadImageFile(uploadImageItemList.get(2)); observable3 = uploadImageFile(uploadImageItemList.get(3)); observable4 = uploadImageFile(uploadImageItemList.get(4)); Observable.zip(observable0, observable1, observable2, observable3, observable4, new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List<UploadImageItem>>() { @Override public List<UploadImageItem> apply(UploadImageItem uploadImageItem, UploadImageItem uploadImageItem2, UploadImageItem uploadImageItem3, UploadImageItem uploadImageItem4, UploadImageItem uploadImageItem5) throws Exception { return uploadImageItemList; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<UploadImageItem>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<UploadImageItem> uploadImageItems) { for (UploadImageItem uploadImageItem : uploadImageItems) { Log.e(TAG, "MultiTaskActivity.java - run() ----- 打印实体:" + uploadImageItem); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } public Observable<UploadImageItem> uploadImageFile(UploadImageItem item) { return Observable.just(item)//这里应该写成网络请求 .subscribeOn(Schedulers.io()) .map(response -> { SystemClock.sleep(new Random().nextInt(1000) + 1000); item.urlPath = "url path from " + item.filePath; Log.e(TAG, "MultiTaskActivity.java - uploadImageFile() finish! item.filePath:" + item.filePath); return item; }); }

    以上是基础写法,但是我们图片数量是1~5张的。查看 zip 的重载方法是这样的。有从2个任务到9个任务的重载方法。 对于可变的任务数量,刚开始我是这么写的。实在有点不忍直视。 后面改了一版。主要思路是使用假任务补足任务数量到5个,然后实际执行时候对那些假任务跳过不执行就可以。

    public final UploadImageItem EMPTY_ITEM = new UploadImageItem(); /** * 任务可变的,正确使用zip操作符的方法 */ private void button6(List<UploadImageItem> uploadImageItemList) { Observable<UploadImageItem> observable0; Observable<UploadImageItem> observable1; Observable<UploadImageItem> observable2; Observable<UploadImageItem> observable3; Observable<UploadImageItem> observable4; List<UploadImageItem> originUploadImageList = new ArrayList<>(uploadImageItemList); //添加null,补全到最多图片数量 int validCount = originUploadImageList.size();//原始数量值 for (int i = 0; i < CONCURRENCY_TASK_NUMBER - validCount; i++) { //伪装操作1:添加值为null的item。 originUploadImageList.add(null); } observable0 = uploadImageFileV2(originUploadImageList.get(0)); observable1 = uploadImageFileV2(originUploadImageList.get(1)); observable2 = uploadImageFileV2(originUploadImageList.get(2)); observable3 = uploadImageFileV2(originUploadImageList.get(3)); observable4 = uploadImageFileV2(originUploadImageList.get(4)); Observable<List<UploadImageItem>> observableUploadResult; //使用zip操作符并发上传图片。 Observable .zip(observable0, observable1, observable2, observable3, observable4, new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List<UploadImageItem>>() { @Override public List<UploadImageItem> apply(UploadImageItem uploadImageItem0, UploadImageItem uploadImageItem1, UploadImageItem uploadImageItem2, UploadImageItem uploadImageItem3, UploadImageItem uploadImageItem4) { List<UploadImageItem> resultUploadImageItemList = new ArrayList<>(); resultUploadImageItemList.add(uploadImageItem0); resultUploadImageItemList.add(uploadImageItem1); resultUploadImageItemList.add(uploadImageItem2); resultUploadImageItemList.add(uploadImageItem3); resultUploadImageItemList.add(uploadImageItem4); //伪装操作3:识别出EMPTY_ITEM,过滤掉。 while (resultUploadImageItemList.contains(EMPTY_ITEM)) { resultUploadImageItemList.remove(EMPTY_ITEM); } return resultUploadImageItemList; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<UploadImageItem>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<UploadImageItem> uploadImageItems) { for (UploadImageItem uploadImageItem : uploadImageItems) { Log.e(TAG, "MultiTaskActivity.java - run() ----- 打印实体:" + uploadImageItem); } } @Override public void onError(Throwable e) { // 如果任意一张图片上传出错,会走到这个方法里 } @Override public void onComplete() { } }); } public Observable<UploadImageItem> uploadImageFileV2(UploadImageItem item) { //伪装操作2:如果是null的item,不需要执行网络请求,而是发送一个EMPTY_ITEM。 //因为 Observable.just(null) 会报错,所以使用 EMPTY_ITEM 这个代表假任务 if (item == null) { return Observable.just(EMPTY_ITEM); } //图片已经上传过,已拥有网络路径 if (item.urlPath != null) { return Observable.just(item); } return Observable.just(item)//这里应该写成网络请求 .subscribeOn(Schedulers.io()) .map(response -> { SystemClock.sleep(new Random().nextInt(1000) + 1000); item.urlPath = "url path from " + item.filePath; Log.e(TAG, "MultiTaskActivity.java - uploadImageFile() finish! item.filePath:" + item.filePath); return item; }); }

    使用这种方法并发上传5张图片之后,完全可以使用 .flatMap 接着发起一个业务请求。 这么写就真的非常的流式编程了。代码如下

    /** * 任务可变的,正确使用zip操作符的方法,接上业务数据请求 */ private void button7(List<UploadImageItem> uploadImageItemList) { Observable<UploadImageItem> observable0; Observable<UploadImageItem> observable1; Observable<UploadImageItem> observable2; Observable<UploadImageItem> observable3; Observable<UploadImageItem> observable4; List<UploadImageItem> originUploadImageList = new ArrayList<>(uploadImageItemList); //添加null,补全到最多图片数量 int validCount = originUploadImageList.size();//原始数量值 for (int i = 0; i < CONCURRENCY_TASK_NUMBER - validCount; i++) { //伪装操作1:添加值为null的item。 originUploadImageList.add(null); } observable0 = uploadImageFileV2(originUploadImageList.get(0)); observable1 = uploadImageFileV2(originUploadImageList.get(1)); observable2 = uploadImageFileV2(originUploadImageList.get(2)); observable3 = uploadImageFileV2(originUploadImageList.get(3)); observable4 = uploadImageFileV2(originUploadImageList.get(4)); Observable<List<UploadImageItem>> observableUploadResult; //使用zip操作符并发上传图片。 Observable .zip(observable0, observable1, observable2, observable3, observable4, new Function5<UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, UploadImageItem, List<UploadImageItem>>() { @Override public List<UploadImageItem> apply(UploadImageItem uploadImageItem0, UploadImageItem uploadImageItem1, UploadImageItem uploadImageItem2, UploadImageItem uploadImageItem3, UploadImageItem uploadImageItem4) { List<UploadImageItem> resultUploadImageItemList = new ArrayList<>(); resultUploadImageItemList.add(uploadImageItem0); resultUploadImageItemList.add(uploadImageItem1); resultUploadImageItemList.add(uploadImageItem2); resultUploadImageItemList.add(uploadImageItem3); resultUploadImageItemList.add(uploadImageItem4); //伪装操作3:识别出EMPTY_ITEM,过滤掉。 while (resultUploadImageItemList.contains(EMPTY_ITEM)) { resultUploadImageItemList.remove(EMPTY_ITEM); } return resultUploadImageItemList; } }) .flatMap(new Function<List<UploadImageItem>, Observable<HttpResult>>() { @Override public Observable<HttpResult> apply(List<UploadImageItem> uploadImageItems) throws Exception { RequestBody requestBody = new RequestBody(); requestBody.uploadImageItemList = uploadImageItemList; //上传完5张图片后,使用flatMap接着发一个业务请求。 return new ApiService().submitInfo(requestBody); } }) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<HttpResult>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(HttpResult httpResult) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } //假装这个一个Retrofit interface public class ApiService { @POST("users/new") Observable<HttpResult> submitInfo(@Body RequestBody requestBody) {return null;} } public static class RequestBody { List<UploadImageItem> uploadImageItemList; String otherParam1; int otherParam2; } public static class HttpResult { int ret; String msg; String data; }
    Processed: 0.018, SQL: 10