子任务多线程

pull/657/head
mxm 5 years ago
parent 4951dda5b0
commit baf4bac091
  1. 6
      HttpComponent/src/main/java/com/arialyy/aria/http/download/HttpSubDLoaderUtil.java
  2. 59
      PublicComponent/src/main/java/com/arialyy/aria/core/group/AbsSubDLoadUtil.java
  3. 58
      PublicComponent/src/main/java/com/arialyy/aria/core/group/SubRecordHandler.java
  4. 308
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/GroupSubThreadStateManager.java
  5. 67
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/SubLoader.java

@ -19,6 +19,7 @@ import android.os.Handler;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.group.AbsSubDLoadUtil;
import com.arialyy.aria.core.group.SubRecordHandler;
import com.arialyy.aria.core.loader.GroupSubThreadStateManager;
import com.arialyy.aria.core.loader.LoaderStructure;
import com.arialyy.aria.core.loader.NormalTTBuilder;
import com.arialyy.aria.core.loader.SubLoader;
@ -49,8 +50,9 @@ final class HttpSubDLoaderUtil extends AbsSubDLoadUtil {
@Override protected LoaderStructure buildLoaderStructure() {
LoaderStructure structure = new LoaderStructure();
structure.addComponent(new SubRecordHandler(getWrapper()))
.addComponent(new NormalTTBuilder(getWrapper(), new HttpDTTBuilderAdapter()))
.addComponent(new HttpDFileInfoTask(getWrapper()));
.addComponent(new GroupSubThreadStateManager(getSchedulers(),getKey()))
.addComponent(new NormalTTBuilder(getWrapper(), new HttpDTTBuilderAdapter()))
.addComponent(new HttpDFileInfoTask(getWrapper()));
structure.accept(getLoader());
return structure;
}

@ -15,14 +15,20 @@
*/
package com.arialyy.aria.core.group;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IUtil;
import com.arialyy.aria.core.listener.IDLoadListener;
import com.arialyy.aria.core.listener.ISchedulers;
import com.arialyy.aria.core.loader.LoaderStructure;
import com.arialyy.aria.core.loader.SubLoader;
import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
@ -38,6 +44,7 @@ public abstract class AbsSubDLoadUtil implements IUtil, Runnable {
private boolean needGetInfo;
private boolean isStop = false, isCancel = false;
private String parentKey;
private IDLoadListener mListener;
/**
* @param schedulers 调度器
@ -46,10 +53,62 @@ public abstract class AbsSubDLoadUtil implements IUtil, Runnable {
protected AbsSubDLoadUtil(DTaskWrapper taskWrapper, Handler schedulers, boolean needGetInfo, String parentKey) {
mWrapper = taskWrapper;
mSchedulers = schedulers;
mListener=createListener();
this.parentKey = parentKey;
this.needGetInfo = needGetInfo;
mDLoader = getLoader();
}
private IDLoadListener createListener() {
return new IDLoadListener() {
@Override
public void onPostPre(long fileSize) {
}
@Override
public void supportBreakpoint(boolean support) {
}
@Override
public void onPre() {
}
@Override
public void onStart(long startLocation) {
}
@Override
public void onResume(long resumeLocation) {
}
@Override
public void onProgress(long currentLocation) {
Message msg =getSchedulers().obtainMessage(IThreadStateManager.STATE_RUNNING, currentLocation);
Bundle b = new Bundle();
msg.setData(b);
b.putString(IThreadStateManager.DATA_THREAD_NAME, getKey());
msg.sendToTarget();
}
@Override
public void onStop(long stopLocation) {
}
@Override
public void onComplete() {
}
@Override
public void onCancel() {
}
@Override
public void onFail(boolean needRetry, AriaException e) {
}
};
}
/**
* 创建加载器

@ -19,8 +19,13 @@ import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.ThreadRecord;
import com.arialyy.aria.core.common.RecordHandler;
import com.arialyy.aria.core.common.RecordHelper;
import com.arialyy.aria.core.config.Configuration;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.loader.IRecordHandler;
import com.arialyy.aria.core.wrapper.AbsTaskWrapper;
import com.arialyy.aria.core.wrapper.ITaskWrapper;
import com.arialyy.aria.util.RecordUtil;
import java.util.ArrayList;
/**
@ -33,7 +38,17 @@ public class SubRecordHandler extends RecordHandler {
@Override public void handlerTaskRecord(TaskRecord record) {
RecordHelper helper = new RecordHelper(getWrapper(), record);
helper.handleSingleThreadRecord();
if (getWrapper().isSupportBP() && record.threadNum > 1) {
if (record.isBlock) {
helper.handleBlockRecord();
} else {
helper.handleMultiRecord();
}
} else if (!getWrapper().isSupportBP()) {
helper.handleNoSupportBPRecord();
} else {
helper.handleSingleThreadRecord();
}
}
@Override
@ -44,9 +59,14 @@ public class SubRecordHandler extends RecordHandler {
tr.threadId = threadId;
tr.startLocation = startL;
tr.isComplete = false;
tr.threadType = getEntity().getTaskType();
tr.endLocation = getFileSize();
tr.blockLen = getFileSize();
tr.threadType = record.taskType;
//最后一个线程的结束位置即为文件的总长度
if (threadId == (record.threadNum - 1)) {
endL = getFileSize();
}
tr.endLocation = endL;
tr.blockLen = RecordUtil.getBlockLen(getFileSize(), threadId, record.threadNum);
return tr;
}
@ -54,20 +74,36 @@ public class SubRecordHandler extends RecordHandler {
TaskRecord record = new TaskRecord();
record.fileName = getEntity().getFileName();
record.filePath = getEntity().getFilePath();
record.fileLength = getFileSize();
record.threadRecords = new ArrayList<>();
record.threadNum = threadNum;
record.isBlock = false;
record.taskType = getWrapper().getRequestType();
record.isGroupRecord = true;
if (getEntity() instanceof DownloadEntity) {
record.dGroupHash = ((DownloadEntity) getEntity()).getGroupHash();
int requestType = getWrapper().getRequestType();
if (requestType == ITaskWrapper.D_HTTP || requestType == ITaskWrapper.DG_HTTP) {
record.isBlock = Configuration.getInstance().downloadCfg.isUseBlock();
} else {
record.isBlock = false;
}
record.taskType = requestType;
record.isGroupRecord = getEntity().isGroupChild();
if (record.isGroupRecord) {
if (getEntity() instanceof DownloadEntity) {
record.dGroupHash = ((DownloadEntity) getEntity()).getGroupHash();
}
}
return record;
}
@Override public int initTaskThreadNum() {
return 1;
int requestTpe = getWrapper().getRequestType();
if (requestTpe == ITaskWrapper.U_HTTP
|| (requestTpe == ITaskWrapper.D_HTTP && (!getWrapper().isSupportBP())
)) {
return 1;
}
int threadNum = Configuration.getInstance().downloadCfg.getThreadNum();
return getFileSize() <= IRecordHandler.SUB_LEN
? 1
: threadNum;
}
}

@ -0,0 +1,308 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.core.loader;
import android.os.Bundle;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import com.arialyy.aria.util.FileUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程任务管理器用于处理多线程下载时任务的状态回调
*/
public class GroupSubThreadStateManager implements IThreadStateManager {
private final String TAG = CommonUtil.getClassName(this);
/**
* 任务状态回调
*/
private Handler mHandler;//SimpleSchedulers
private int mThreadNum; // 启动的线程总数
private AtomicInteger mCancelNum = new AtomicInteger(0); // 已经取消的线程的数
private AtomicInteger mStopNum = new AtomicInteger(0); // 已经停止的线程数
private AtomicInteger mFailNum = new AtomicInteger(0); // 失败的线程数
private AtomicInteger mCompleteNum = new AtomicInteger(0); // 完成的线程数
private long mProgress; //当前总进度
private TaskRecord mTaskRecord; // 任务记录
private Looper mLooper;
private String mKey;
/**
* @param handler 任务事件
*/
public GroupSubThreadStateManager(Handler handler, String key) {
mHandler = handler;
mKey = key;
}
@Override public void setLooper(TaskRecord taskRecord, Looper looper) {
mTaskRecord = taskRecord;
mThreadNum = mTaskRecord.threadNum;
mLooper = looper;
}
private void checkLooper() {
if (mTaskRecord == null) {
throw new NullPointerException("任务记录为空");
}
if (mLooper == null) {
throw new NullPointerException("Looper为空");
}
}
private Handler.Callback callback = new Handler.Callback() {
@Override public boolean handleMessage(Message msg) {
checkLooper();
switch (msg.what) {
case STATE_STOP:
mStopNum.getAndIncrement();
if (isStop()) {
quitLooper();
}
sendMessageFromMsg(msg);
break;
case STATE_CANCEL:
mCancelNum.getAndIncrement();
if (isCancel()) {
quitLooper();
}
sendMessageFromMsg(msg);
break;
case STATE_FAIL:
mFailNum.getAndIncrement();
if (isFail()) {
sendMessageFromMsg(msg);
/* Bundle b = msg.getData();
mListener.onFail(b.getBoolean(DATA_RETRY, false),
(AriaException) b.getSerializable(DATA_ERROR_INFO));*/
quitLooper();
}
sendMessageFromMsg(msg);
break;
case STATE_COMPLETE:
mCompleteNum.getAndIncrement();
if (isComplete()) {
ALog.d(TAG, "isComplete, completeNum = " + mCompleteNum);
//if (mTaskRecord.taskType == ITaskWrapper.D_SFTP) {
// mergerSFtp();
// mListener.onComplete();
//} else
if (mTaskRecord.isBlock) {
/*if (mergeFile()) {
mListener.onComplete();
} else {
mListener.onFail(false, null);
}*/
if (!mergeFile()) {
Bundle b=msg.getData();
b.putBoolean(IThreadStateManager.DATA_RETRY, false);
msg.setData(b);
msg.what = STATE_FAIL;
sendMessageFromMsg(msg);
}
sendMessageFromMsg(msg);
} else {
sendMessageFromMsg(msg);
//mListener.onComplete();
}
quitLooper();
}
break;
case STATE_RUNNING:
Bundle b = msg.getData();
if (b != null) {
long len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0);
mProgress += len;
}
msg.obj=mProgress;
sendMessageFromMsg(msg);
break;
case STATE_UPDATE_PROGRESS:
if (msg.obj == null) {
mProgress = updateBlockProgress();
} else if (msg.obj instanceof Long) {
mProgress = (long) msg.obj;
}
msg.obj=mProgress;
sendMessageFromMsg(msg);
break;
}
return false;
}
public void sendMessageFromMsg(Message msg){
Message mMsg=mHandler.obtainMessage();
Bundle b=mMsg.getData();
b.putString(IThreadStateManager.DATA_THREAD_NAME,mKey);
msg.setData(b);
mMsg.copyFrom(msg);
mHandler.sendMessage(mMsg);
}
};
@Override public void updateCurrentProgress(long currentProgress) {
mProgress = currentProgress;
}
/**
* 退出looper循环
*/
private void quitLooper() {
mLooper.quit();
}
/**
* 获取当前任务下载进度
*
* @return 当前任务下载进度
*/
@Override
public long getCurrentProgress() {
return mProgress;
}
@Override public Handler.Callback getHandlerCallback() {
return callback;
}
/**
* 所有子线程是否都已经停止
*/
public boolean isStop() {
//ALog.d(TAG,
// String.format("isStop; stopNum: %s, cancelNum: %s, failNum: %s, completeNum: %s", mStopNum,
// mCancelNum, mFailNum, mCompleteNum));
return mStopNum.get() == mThreadNum || mStopNum.get() + mCompleteNum.get() == mThreadNum;
}
/**
* 所有子线程是否都已经失败
*/
@Override
public boolean isFail() {
//ALog.d(TAG,
// String.format("isFail; stopNum: %s, cancelNum: %s, failNum: %s, completeNum: %s", mStopNum,
// mCancelNum, mFailNum, mCompleteNum));
return mCompleteNum.get() != mThreadNum
&& (mFailNum.get() == mThreadNum || mFailNum.get() + mCompleteNum.get() == mThreadNum);
}
/**
* 所有子线程是否都已经完成
*/
@Override
public boolean isComplete() {
//ALog.d(TAG,
// String.format("isComplete; stopNum: %s, cancelNum: %s, failNum: %s, completeNum: %s",
// mStopNum,
// mCancelNum, mFailNum, mCompleteNum));
return mCompleteNum.get() == mThreadNum;
}
/**
* 所有子线程是否都已经取消
*/
public boolean isCancel() {
//ALog.d(TAG, String.format("isCancel; stopNum: %s, cancelNum: %s, failNum: %s, completeNum: %s",
// mStopNum,
// mCancelNum, mFailNum, mCompleteNum));
return mCancelNum.get() == mThreadNum;
}
/**
* 更新分块任务s的真实进度
*/
private long updateBlockProgress() {
long size = 0;
for (int i = 0, len = mTaskRecord.threadRecords.size(); i < len; i++) {
File temp = new File(String.format(IRecordHandler.SUB_PATH, mTaskRecord.filePath, i));
if (temp.exists()) {
size += temp.length();
}
}
return size;
}
/**
* 合并sftp的分块
*/
private boolean mergerSFtp() {
if (mTaskRecord.threadNum == 1) {
File partFile = new File(String.format(IRecordHandler.SUB_PATH, mTaskRecord.filePath, 0));
return partFile.renameTo(new File(mTaskRecord.filePath));
}
List<String> partPath = new ArrayList<>();
for (int i = 0, len = mTaskRecord.threadNum; i < len; i++) {
partPath.add(String.format(IRecordHandler.SUB_PATH, mTaskRecord.filePath, i));
}
FileUtil.mergeSFtpFile(mTaskRecord.filePath, partPath, mTaskRecord.fileLength);
for (String pp : partPath) {
FileUtil.deleteFile(pp);
}
return true;
}
/**
* 合并文件
*
* @return {@code true} 合并成功{@code false}合并失败
*/
private boolean mergeFile() {
if (mTaskRecord.threadNum == 1) {
File partFile = new File(String.format(IRecordHandler.SUB_PATH, mTaskRecord.filePath, 0));
return partFile.renameTo(new File(mTaskRecord.filePath));
}
List<String> partPath = new ArrayList<>();
for (int i = 0, len = mTaskRecord.threadNum; i < len; i++) {
partPath.add(String.format(IRecordHandler.SUB_PATH, mTaskRecord.filePath, i));
}
boolean isSuccess = FileUtil.mergeFile(mTaskRecord.filePath, partPath);
if (isSuccess) {
for (String pp : partPath) {
FileUtil.deleteFile(pp);
}
File targetFile = new File(mTaskRecord.filePath);
if (targetFile.exists() && targetFile.length() > mTaskRecord.fileLength) {
ALog.e(TAG, String.format("任务【%s】分块文件合并失败,下载长度超出文件真实长度,downloadLen: %s,fileSize: %s",
targetFile.getName(), targetFile.length(), mTaskRecord.fileLength));
return false;
}
return true;
} else {
ALog.e(TAG, "合并失败");
return false;
}
}
@Override public void accept(ILoaderVisitor visitor) {
visitor.addComponent(this);
}
}

@ -17,10 +17,12 @@ package com.arialyy.aria.core.loader;
import android.os.Bundle;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.text.TextUtils;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.AbsNormalEntity;
import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.manager.ThreadTaskManager;
@ -31,6 +33,7 @@ import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
/**
@ -46,9 +49,10 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
private IInfoTask infoTask;
private IThreadTaskBuilder ttBuild;
private IRecordHandler recordHandler;
private IThreadTask threadTask;
private List<IThreadTask> mTask = new ArrayList<>();
private String parentKey;
private TaskRecord record;
protected IThreadStateManager mStateManager;
public SubLoader(AbsTaskWrapper wrapper, Handler schedulers) {
this.wrapper = wrapper;
@ -96,18 +100,24 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
if (isBreak()){
return;
}
Looper looper=Looper.myLooper();
if(looper==null) {
Looper.prepare();
looper=Looper.myLooper();
}
record = recordHandler.getRecord(wrapper.getEntity().getFileSize());
if (record.threadRecords != null
&& !TextUtils.isEmpty(record.filePath)
&& new File(record.filePath).exists()
&& !record.threadRecords.isEmpty()
&& record.threadRecords.get(0).isComplete) {
&& !TextUtils.isEmpty(record.filePath)
&& new File(record.filePath).exists()
&& !record.threadRecords.isEmpty()
&& record.threadRecords.get(0).isComplete) {
ALog.d(TAG, "子任务已完成,key:" + wrapper.getKey());
sendNormalState(IThreadStateManager.STATE_COMPLETE);
return;
}
List<IThreadTask> task = ttBuild.buildThreadTask(record, schedulers);
List<IThreadTask> task = ttBuild.buildThreadTask(record, new Handler(looper, mStateManager.getHandlerCallback()));
mStateManager.setLooper(record, looper);
if (task == null || task.isEmpty()) {
ALog.e(TAG, "创建子任务的线程任务失败,key:" + wrapper.getKey());
sendFailState(false);
@ -118,14 +128,22 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
sendFailState(false);
return;
}
sendNormalState(IThreadStateManager.STATE_PRE);
threadTask = task.get(0);
mTask .addAll( task);
try {
ThreadTaskManager.getInstance().startThread(parentKey, threadTask);
for (IThreadTask iThreadTask : mTask) {
ThreadTaskManager.getInstance().startThread(parentKey, iThreadTask);
}
sendNormalState(IThreadStateManager.STATE_START);
mStateManager.updateCurrentProgress(getWrapper().getEntity().getCurrentProgress());
} catch (Exception e) {
e.printStackTrace();
}
Looper.loop();
}
public TaskRecord getRecord(){
@ -142,8 +160,10 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
public void retryTask() {
try {
if (threadTask != null) {
threadTask.call();
if (!mTask.isEmpty() ) {
for (IThreadTask iThreadTask : mTask) {
iThreadTask.call();
}
} else {
ALog.e(TAG, "子任务的线程任务为空");
}
@ -158,11 +178,19 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
return;
}
isStop = true;
threadTask.stop();
for (IThreadTask iThreadTask : mTask) {
iThreadTask.stop();
}
}
@Override public boolean isRunning() {
return threadTask != null && !threadTask.isBreak();
if(mTask.isEmpty())
return false;
for (IThreadTask iThreadTask : mTask) {
if(!iThreadTask.isBreak()) return true;
}
return false;
}
@Override public void cancel() {
@ -171,7 +199,9 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
return;
}
isCancel = true;
threadTask.cancel();
for (IThreadTask iThreadTask : mTask) {
iThreadTask.cancel();
}
}
@Override public boolean isBreak() {
@ -192,12 +222,8 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
return CommonUtil.getThreadName(wrapper.getKey(), 0);
}
/**
* @deprecated 子任务不需要实现这个
*/
@Deprecated
@Override public long getCurrentProgress() {
return 0;
return isRunning() ? mStateManager.getCurrentProgress() : getWrapper().getEntity().getCurrentProgress();
}
@Override public void addComponent(IRecordHandler recordHandler) {
@ -217,11 +243,8 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
});
}
/**
* @deprecated 子任务不需要实现这个
*/
@Override public void addComponent(IThreadStateManager threadState) {
// 子任务不需要实现这个
mStateManager = threadState;
}
@Override public void addComponent(IThreadTaskBuilder builder) {

Loading…
Cancel
Save