Subtask status refinement

v4
laoyuyu 2 years ago
parent 6b8c051b30
commit c06b1fd5fb
  1. 9
      Http/src/main/java/com/arialyy/aria/http/download/HttpDEventListener.kt
  2. 221
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGInfoTask.java
  3. 16
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt
  4. 4
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java
  5. 44
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubBlockManager.kt
  6. 59
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubDLoaderAdapter.java
  7. 5
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskManager.kt

@ -15,21 +15,13 @@
*/
package com.arialyy.aria.http.download
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.IEntity
import com.arialyy.aria.core.listener.AbsEventListener
import com.arialyy.aria.core.listener.ISchedulers
import com.arialyy.aria.core.task.SingleDownloadTask
import com.arialyy.aria.core.task.TaskCachePool
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.orm.entity.DEntity
import com.arialyy.aria.util.BlockUtil
import com.arialyy.aria.util.FileUri
import com.arialyy.aria.util.FileUtils
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import timber.log.Timber
import java.io.File
class HttpDEventListener(task: SingleDownloadTask) : AbsEventListener(task) {
@ -49,7 +41,6 @@ class HttpDEventListener(task: SingleDownloadTask) : AbsEventListener(task) {
saveData(IEntity.STATE_COMPLETE, task.taskState.fileSize)
}
override fun handleCancel() {
BlockUtil.removeTaskBlock(task)
}

@ -1,221 +0,0 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.dua.group;
import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.download.DGTaskWrapper;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.loader.IInfoTask;
import com.arialyy.aria.core.loader.ILoaderVisitor;
import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.exception.AriaHTTPException;
import com.arialyy.aria.http.HttpTaskOption;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 组合任务文件信息用于获取长度未知时组合任务的长度
*/
public final class HttpDGInfoTask implements IInfoTask {
private String TAG = CommonUtil.getClassName(this);
private Callback callback;
private DGTaskWrapper wrapper;
private final Object LOCK = new Object();
private ExecutorService mPool = null;
private boolean getLenComplete = false;
private AtomicInteger count = new AtomicInteger();
private AtomicInteger failCount = new AtomicInteger();
private boolean isStop = false, isCancel = false;
public interface DGInfoCallback extends Callback {
/**
* 子任务失败
*/
void onSubFail(DownloadEntity subEntity, AriaHTTPException e, boolean needRetry);
/**
* 组合任务停止
*/
void onStop(long len);
}
/**
* 子任务回调
*/
private Callback subCallback = new Callback() {
@Override public void onSucceed(String url, CompleteInfo info) {
count.getAndIncrement();
checkGetSizeComplete(count.get(), failCount.get());
ALog.d(TAG, "获取子任务信息完成");
}
@Override public void onFail(AbsEntity entity, AriaException e, boolean needRetry) {
ALog.e(TAG, String.format("获取文件信息失败,url:%s", ((DownloadEntity) entity).getUrl()));
count.getAndIncrement();
failCount.getAndIncrement();
((DGInfoCallback) callback).onSubFail((DownloadEntity) entity, new AriaHTTPException(
String.format("子任务获取文件长度失败,url:%s", ((DownloadEntity) entity).getUrl())), needRetry);
checkGetSizeComplete(count.get(), failCount.get());
}
};
HttpDGInfoTask(DGTaskWrapper wrapper) {
this.wrapper = wrapper;
}
/**
* 停止
*/
@Override
public void stop() {
isStop = true;
if (mPool != null) {
mPool.shutdown();
}
}
@Override public void cancel() {
isCancel = true;
if (mPool != null) {
mPool.shutdown();
}
}
@Override public void run() {
// 如果是isUnknownSize()标志,并且获取大小没有完成,则直接回调onStop
if (mPool != null && !getLenComplete) {
ALog.d(TAG, "获取长度未完成的情况下,停止组合任务");
mPool.shutdown();
((DGInfoCallback)callback).onStop(0);
return;
}
// 处理组合任务大小未知的情况
if (wrapper.isUnknownSize()) {
mPool = Executors.newCachedThreadPool();
getGroupSize();
try {
synchronized (LOCK) {
LOCK.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!mPool.isShutdown()) {
mPool.shutdown();
}
} else {
for (DTaskWrapper wrapper : wrapper.getSubTaskWrapper()) {
cloneHeader(wrapper);
}
callback.onSucceed(wrapper.getKey(), new CompleteInfo());
}
}
/*
* 获取组合任务大小使用该方式获取到的组合任务大小子任务不需要再重新获取文件大小
*/
private void getGroupSize() {
new Thread(new Runnable() {
@Override public void run() {
for (DTaskWrapper dTaskWrapper : wrapper.getSubTaskWrapper()) {
DownloadEntity subEntity = dTaskWrapper.getEntity();
if (subEntity.getFileSize() > 0) {
count.getAndIncrement();
if (subEntity.getCurrentProgress() < subEntity.getFileSize()) {
// 如果没有完成需要拷贝一份数据
cloneHeader(dTaskWrapper);
}
checkGetSizeComplete(count.get(), failCount.get());
continue;
}
cloneHeader(dTaskWrapper);
HttpDFileInfoTask infoTask = new HttpDFileInfoTask(dTaskWrapper);
infoTask.setCallback(subCallback);
mPool.execute(infoTask);
}
}
}).start();
}
/**
* 检查组合任务大小是否获取完成获取完成后取消阻塞并设置组合任务大小
*/
private void checkGetSizeComplete(int count, int failCount) {
if (isStop || isCancel) {
ALog.w(TAG, "任务已停止或已取消,isStop = " + isStop + ", isCancel = " + isCancel);
notifyLock();
return;
}
if (failCount == wrapper.getSubTaskWrapper().size()) {
callback.onFail(wrapper.getEntity(), new AriaHTTPException("获取子任务长度失败"), false);
notifyLock();
return;
}
if (count == wrapper.getSubTaskWrapper().size()) {
long size = 0;
for (DTaskWrapper wrapper : wrapper.getSubTaskWrapper()) {
size += wrapper.getEntity().getFileSize();
}
wrapper.getEntity().setConvertFileSize(CommonUtil.formatFileSize(size));
wrapper.getEntity().setFileSize(size);
wrapper.getEntity().update();
getLenComplete = true;
ALog.d(TAG, String.format("获取组合任务长度完成,组合任务总长度:%s,失败的子任务数:%s", size, failCount));
callback.onSucceed(wrapper.getKey(), new CompleteInfo());
notifyLock();
}
}
private void notifyLock() {
synchronized (LOCK) {
LOCK.notifyAll();
}
}
/**
* 子任务使用父包裹器的属性
*/
private void cloneHeader(DTaskWrapper taskWrapper) {
HttpTaskOption groupOption = (HttpTaskOption) wrapper.getTaskOption();
HttpTaskOption subOption = new HttpTaskOption();
// 设置属性
subOption.setFileLenAdapter(groupOption.getFileLenAdapter());
subOption.setFileNameAdapter(groupOption.getFileNameAdapter());
subOption.setUseServerFileName(groupOption.isUseServerFileName());
subOption.setFileNameAdapter(groupOption.getFileNameAdapter());
subOption.setRequestEnum(groupOption.getRequestEnum());
subOption.setHeaders(groupOption.getHeaders());
subOption.setProxy(groupOption.getProxy());
subOption.setParams(groupOption.getParams());
taskWrapper.setTaskOption(subOption);
}
@Override public void setCallback(Callback callback) {
this.callback = callback;
}
@Override public void accept(ILoaderVisitor visitor) {
visitor.addComponent(this);
}
}

@ -19,6 +19,8 @@ import android.os.Handler
import android.os.Looper
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.download.HttpDTaskAdapter
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.asCoroutineDispatcher
@ -44,20 +46,20 @@ internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBloc
)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val scope = MainScope()
private val eventListener: IEventListener =
task.getTaskOption(HttpTaskOption::class.java).eventListener
private val callback = Handler.Callback { msg ->
when (msg.what) {
ITaskManager.STATE_STOP -> {
ITaskManager.SUB_STATE_STOP -> {
}
ITaskManager.STATE_CANCEL -> {
ITaskManager.SUB_STATE_CANCEL -> {
}
ITaskManager.STATE_FAIL -> {
ITaskManager.SUB_STATE_FAIL -> {
}
ITaskManager.STATE_COMPLETE -> {
ITaskManager.SUB_STATE_COMPLETE -> {
}
ITaskManager.STATE_RUNNING -> {
}
ITaskManager.STATE_UPDATE_PROGRESS -> {
ITaskManager.SUB_STATE_RUNNING -> {
}
}
false

@ -30,9 +30,9 @@ import java.util.Objects;
*/
public class HttpDGroupTask extends AbsTask {
private List<SingleDownloadTask> incompleteTaskList = new ArrayList<>();
private final List<SingleDownloadTask> incompleteTaskList = new ArrayList<>();
private List<SingleDownloadTask> subTaskList = new ArrayList<>();
private final List<SingleDownloadTask> subTaskList = new ArrayList<>();
public HttpDGroupTask(ITaskOption taskOption) {
super(taskOption);

@ -24,7 +24,6 @@ import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.TaskCachePool.removeTask
import com.arialyy.aria.core.task.TaskCachePool.updateState
import com.arialyy.aria.core.task.TaskState
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.download.HttpDOptionAdapter
import com.arialyy.aria.util.BlockUtil
@ -40,8 +39,7 @@ internal class HttpSubBlockManager(private val task: ITask, private val groupHan
private lateinit var looper: Looper
private lateinit var handler: Handler
private var isStop = false
private var isCancel = false
private var isBreak = false
private var progress: Long = 0L
/**
@ -50,17 +48,22 @@ internal class HttpSubBlockManager(private val task: ITask, private val groupHan
private val callback = Handler.Callback { msg ->
when (msg.what) {
ITaskManager.STATE_STOP -> {
isStop = true
isBreak = true
saveData(IEntity.STATE_STOP)
sendStateToGroup(ITaskManager.SUB_STATE_STOP)
quitLooper()
}
ITaskManager.STATE_CANCEL -> {
isCancel = true
isBreak = true
removeTask(task)
BlockUtil.removeTaskBlock(task)
sendStateToGroup(ITaskManager.SUB_STATE_CANCEL)
quitLooper()
}
ITaskManager.STATE_FAIL -> {
isBreak = true
sendStateToGroup(ITaskManager.SUB_STATE_FAIL)
quitLooper()
}
ITaskManager.STATE_COMPLETE -> {
saveData(IEntity.STATE_COMPLETE)
@ -68,27 +71,36 @@ internal class HttpSubBlockManager(private val task: ITask, private val groupHan
if (!b) {
Timber.e("merge block fail")
onFail(false, AriaException("merge block fail"))
return
sendStateToGroup(ITaskManager.SUB_STATE_FAIL)
} else {
task.taskState.speed = 0
saveData(IEntity.STATE_COMPLETE)
}
task.taskState.speed = 0
saveData(IEntity.STATE_COMPLETE)
}
ITaskManager.STATE_RUNNING -> {
updateProgress(msg.obj as Long)
}
ITaskManager.STATE_UPDATE_PROGRESS -> {
val b = msg.data
if (b != null) {
val len = b.getLong(ITaskManager.DATA_ADD_LEN, 0)
progress += len
task.taskState.speed = len
task.taskState.curProgress = progress
updateProgress(b.getLong(ITaskManager.DATA_ADD_LEN, 0))
}
}
}
false
}
private fun sendStateToGroup(state: Int) {
groupHandler.obtainMessage(state, task).sendToTarget()
}
private fun updateProgress(len: Long) {
progress += len
task.taskState.speed = len
task.taskState.curProgress = progress
groupHandler.obtainMessage(ITaskManager.SUB_STATE_RUNNING, len).sendToTarget()
}
private fun saveData(state: Int) {
val ts: TaskState = task.taskState
ts.state = state
@ -115,14 +127,10 @@ internal class HttpSubBlockManager(private val task: ITask, private val groupHan
// Synchronized sequential execution of all block
task.getTaskOption(HttpTaskOption::class.java)
.getOptionAdapter(HttpDOptionAdapter::class.java).threadList.forEach { tt ->
if (isStop) {
if (isBreak) {
Timber.d("task stopped")
return
}
if (isCancel) {
Timber.d("task canceled")
return
}
tt.run()
}
}

@ -1,59 +0,0 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.dua.group;
import android.os.Handler;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.group.AbsSubDLoadAdapter;
import com.arialyy.aria.core.group.SubRecordHandler;
import com.arialyy.aria.core.loader.GroupSubThreadStateManager;
import com.arialyy.aria.core.loader.LoaderStructure;
import com.arialyy.aria.core.loader.NormalTTBuilder;
import com.arialyy.aria.core.loader.SubLoader;
/**
* @author lyy
* Date: 2019-09-28
*/
final class HttpSubDLoaderAdapter extends AbsSubDLoadAdapter {
/**
* @param schedulers 调度器
* @param needGetInfo {@code true} 需要获取文件信息{@code false} 不需要获取文件信息
*/
HttpSubDLoaderAdapter( Handler schedulers, boolean needGetInfo, String parentKey) {
super(schedulers, needGetInfo, parentKey);
}
@Override protected SubLoader getLoader() {
if (mDLoader == null) {
mDLoader = new SubLoader(getWrapper(), getSchedulers());
mDLoader.setNeedGetInfo(isNeedGetInfo());
mDLoader.setParentKey(getParentKey());
}
return mDLoader;
}
@Override protected LoaderStructure buildLoaderStructure() {
LoaderStructure structure = new LoaderStructure();
structure.addComponent(new SubRecordHandler(getWrapper()))
.addComponent(new GroupSubThreadStateManager(getSchedulers(),getKey()))
.addComponent(new NormalTTBuilder(getWrapper(), new HttpDTTBuilderAdapter()))
.addComponent(new HttpDFileInfoTask(getWrapper()));
structure.accept(getLoader());
return structure;
}
}

@ -31,6 +31,11 @@ interface ITaskManager {
const val STATE_UPDATE_PROGRESS = 0x06
const val STATE_PRE = 0x07
const val STATE_START = 0x08
const val SUB_STATE_STOP = 0xb1
const val SUB_STATE_FAIL = 0xb2
const val SUB_STATE_CANCEL = 0xb3
const val SUB_STATE_COMPLETE = 0xb4
const val SUB_STATE_RUNNING = 0xb5
const val DATA_RETRY = "DATA_RETRY"
const val DATA_ERROR_INFO = "DATA_ERROR_INFO"
const val DATA_THREAD_NAME = "DATA_THREAD_NAME"

Loading…
Cancel
Save