thread task

v4
laoyuyu 2 years ago
parent 985b5c48d1
commit adcc580f6c
  1. 6
      Aria/src/main/java/com/arialyy/aria/core/common/receiver/LifLifecycleReceiver.kt
  2. 5
      PublicComponent/src/main/java/com/arialyy/aria/core/DuaContext.kt
  3. 7
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskQueue.java
  4. 8
      PublicComponent/src/main/java/com/arialyy/aria/core/listener/ISchedulers.java
  5. 29
      PublicComponent/src/main/java/com/arialyy/aria/core/service/ServiceManager.kt
  6. 3
      PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskState.kt
  7. 4
      Queue/src/main/java/com/arialyy/aria/queue/AbsTaskQueue.java
  8. 80
      Schedulers/src/main/java/com/arialyy/aria/schedulers/TaskSchedulers.java

@ -20,7 +20,6 @@ import androidx.lifecycle.LifecycleOwner
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.IComponentLoader
import com.arialyy.aria.core.inf.IDuaReceiver
import com.arialyy.aria.schedulers.TaskSchedulers
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Proxy
@ -52,12 +51,13 @@ class LifLifecycleReceiver(val lifecycle: LifecycleOwner) : IDuaReceiver {
lifecycle.lifecycle.addObserver(object : DefaultLifecycleObserver {
override fun onCreate(owner: LifecycleOwner) {
super.onCreate(owner)
com.arialyy.aria.schedulers.TaskSchedulers.getInstance().register(lifecycle, loader.getTaskEnum())
DuaContext.getServiceManager().getSchedulerImp()
.register(lifecycle, loader.getTaskEnum())
}
override fun onDestroy(owner: LifecycleOwner) {
super.onDestroy(owner)
com.arialyy.aria.schedulers.TaskSchedulers.getInstance().unRegister(lifecycle)
DuaContext.getServiceManager().getSchedulerImp().unRegister(lifecycle)
DuaContext.getLifeManager().removeLoader(lifecycle)
DuaContext.getLifeManager().removeCustomListener(lifecycle)
}

@ -30,10 +30,11 @@ import kotlinx.coroutines.MainScope
object DuaContext {
const val DB_SERVICE = "DB_SERVICE"
const val D_QUEUE = "D_QUEUE"
const val DG_QUEUE = "DG_QUEUE"
const val U_QUEUE = "U_QUEUE"
const val SCHEDULER = "SCHEDULER"
private val serviceArray = arrayOf(DB_SERVICE, D_QUEUE, U_QUEUE, SCHEDULER)
private val serviceArray = arrayOf(DB_SERVICE, D_QUEUE, DG_QUEUE, U_QUEUE, SCHEDULER)
val duaScope = MainScope()
lateinit var context: Context
@ -44,6 +45,8 @@ object DuaContext {
fun getLifeManager() = LifecycleManager
fun getCommonConfig() = AriaConfig.getInstance().cConfig
fun getDConfig() = AriaConfig.getInstance().dConfig
fun getUConfig() = AriaConfig.getInstance().uConfig

@ -81,6 +81,13 @@ public interface ITaskQueue<TASK extends ITask> extends IService {
*/
void setQueueSize(int size);
/**
* queue is full
*
* @return true full
*/
boolean isFull();
/**
* get task by id
*/

@ -17,6 +17,7 @@
package com.arialyy.aria.core.listener;
import android.os.Handler;
import com.arialyy.annotations.TaskEnum;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.download.DownloadGroupEntity;
import com.arialyy.aria.core.service.IService;
@ -183,4 +184,11 @@ public interface ISchedulers extends Handler.Callback, IService {
* M3U8切片下载失败
*/
int M3U8_PEER_FAIL = 0xb3;
void register(Object obj, TaskEnum taskEnum);
/**
* unRegister object
*/
void unRegister(Object obj);
}

@ -20,7 +20,9 @@ import android.os.Looper
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.ITaskQueue
import com.arialyy.aria.core.listener.ISchedulers
import com.arialyy.aria.core.task.DownloadGroupTask
import com.arialyy.aria.core.task.DownloadTask
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.UploadTask
import com.arialyy.aria.exception.AriaException
import timber.log.Timber
@ -71,21 +73,38 @@ object ServiceManager {
/**
* get downloadQueue service, if already [registerService] custom queue, return custom queue
*/
fun getDownloadQueue(): ITaskQueue<DownloadTask> {
fun getDownloadQueue(): ITaskQueue<ITask> {
return (serviceCache[DuaContext.D_QUEUE]
?: throw AriaException("queue not found: ${DuaContext.D_QUEUE}")) as ITaskQueue<DownloadTask>
?: throw AriaException("queue not found: ${DuaContext.D_QUEUE}")) as ITaskQueue<ITask>
}
/**
* get downloadGroupQueue service, if already [registerService] custom queue, return custom queue
*/
fun getDownloadGroupQueue(): ITaskQueue<ITask> {
return (serviceCache[DuaContext.DG_QUEUE]
?: throw AriaException("queue not found: ${DuaContext.DG_QUEUE}")) as ITaskQueue<ITask>
}
/**
* get uploadQueue service, if already [registerService] custom queue, return custom queue
*/
fun getUploadQueue(): ITaskQueue<UploadTask> {
fun getUploadQueue(): ITaskQueue<ITask> {
return (serviceCache[DuaContext.U_QUEUE]
?: throw AriaException("queue not found: ${DuaContext.U_QUEUE}")) as ITaskQueue<UploadTask>
?: throw AriaException("queue not found: ${DuaContext.U_QUEUE}")) as ITaskQueue<ITask>
}
/**
* get uploadQueue service, if already [registerService] custom queue, return custom queue
* get [ISchedulers] service
*/
fun getSchedulerImp(): ISchedulers {
return serviceCache[DuaContext.SCHEDULER] as ISchedulers?
?: throw AriaException("queue not found: ${DuaContext.SCHEDULER}")
}
/**
* key [DuaContext.SCHEDULER]
* get scheduler service, if already [registerService] custom [ISchedulers], return custom [ISchedulers] Handler
*/
fun getSchedulerHandler(): Handler {
if (schedulerHandler != null) {

@ -17,6 +17,7 @@ package com.arialyy.aria.core.task
import com.arialyy.aria.core.config.Configuration
import com.arialyy.aria.core.inf.IEntity
import com.arialyy.aria.orm.entity.BlockRecord
import com.arialyy.aria.util.CommonUtil
/**
@ -72,7 +73,7 @@ class TaskState {
*/
var timeLeft: Int = Int.Companion.MAX_VALUE
val blockSize = BlockState.BLOCK_SIZE
val blockSize = BlockRecord.BLOCK_SIZE
fun getPercent() = ((curProgress * 100) / fileSize).toInt()

@ -37,6 +37,10 @@ public abstract class AbsTaskQueue<TASK extends ITask> implements ITaskQueue<TAS
maxSize = getMaxTaskSize();
}
@Override public boolean isFull() {
return getExePool().size() < maxSize;
}
protected IPool<TASK> getCachePool() {
return DEF_CACHE_POOL;
}

@ -15,22 +15,21 @@
*/
package com.arialyy.aria.schedulers;
import android.content.Context;
import android.content.Intent;
import android.os.Bundle;
import android.os.Message;
import androidx.annotation.NonNull;
import com.arialyy.annotations.TaskEnum;
import com.arialyy.aria.core.AriaConfig;
import com.arialyy.aria.core.DuaContext;
import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.AbsNormalEntity;
import com.arialyy.aria.core.group.GroupSendParams;
import com.arialyy.aria.core.inf.IEntity;
import com.arialyy.aria.core.inf.ITaskQueue;
import com.arialyy.aria.core.inf.TaskSchedulerType;
import com.arialyy.aria.core.listener.ISchedulers;
import com.arialyy.aria.core.manager.TaskWrapperManager;
import com.arialyy.aria.core.queue.DGroupTaskQueue;
import com.arialyy.aria.core.queue.DTaskQueue;
import com.arialyy.aria.core.queue.ITaskQueue;
import com.arialyy.aria.core.queue.UTaskQueue;
import com.arialyy.aria.core.task.AbsTask;
import com.arialyy.aria.core.task.DownloadGroupTask;
import com.arialyy.aria.core.task.DownloadTask;
@ -43,6 +42,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import timber.log.Timber;
/**
* Created by lyy on 2017/6/4. 事件调度器用于处理任务状态的调度
@ -77,15 +77,15 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
*
* @param taskType 任务类型
*/
ITaskQueue getQueue(int taskType) {
ITaskQueue<ITask> getQueue(int taskType) {
if (taskType == ITask.DOWNLOAD) {
return DTaskQueue.getInstance();
return DuaContext.INSTANCE.getServiceManager().getDownloadQueue();
}
if (taskType == ITask.DOWNLOAD_GROUP) {
return DGroupTaskQueue.getInstance();
return DuaContext.INSTANCE.getServiceManager().getDownloadGroupQueue();
}
if (taskType == ITask.UPLOAD) {
return UTaskQueue.getInstance();
return DuaContext.INSTANCE.getServiceManager().getUploadQueue();
}
throw new NullPointerException("任务类型错误,type = " + taskType);
}
@ -96,6 +96,7 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
* @param obj 观察者类
* @param taskEnum 任务类型 {@link TaskEnum}
*/
@Override
public void register(Object obj, TaskEnum taskEnum) {
String targetName = obj.getClass().getName();
Map<TaskEnum, Object> listeners = mObservers.get(getKey(obj));
@ -136,6 +137,7 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
*
* @param obj 观察者类
*/
@Override
public void unRegister(Object obj) {
if (!mObservers.containsKey(getKey(obj))) {
return;
@ -303,32 +305,32 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
* 处理普通任务和任务组的事件
*/
private void handleNormalEvent(TASK task, int what) {
ITaskQueue queue = getQueue(task.getTaskType());
ITaskQueue<ITask> queue = getQueue(task.getTaskType());
switch (what) {
case STOP:
if (task.getTaskState() == IEntity.STATE_WAIT) {
break;
}
queue.removeTaskFormQueue(task.getKey());
if (queue.getCurrentExePoolNum() < queue.getMaxTaskNum()) {
ALog.d(TAG, String.format("停止任务【%s】成功,尝试开始下一任务", task.getTaskName()));
queue.stopTask(task);
if (!queue.isFull()) {
Timber.d("stop task success, if there is a task, start the next task, taskId: %s",
task.getTaskId());
startNextTask(queue, task.getSchedulerType());
} else {
ALog.d(TAG, String.format("停止任务【%s】成功", task.getTaskName()));
Timber.d("stop task success, taskId: %s", task.getTaskId());
}
break;
case CANCEL:
queue.removeTaskFormQueue(task.getKey());
if (queue.getCurrentExePoolNum() < queue.getMaxTaskNum()) {
ALog.d(TAG, String.format("删除任务【%s】成功,尝试开始下一任务", task.getTaskName()));
queue.removeTask(task);
if (!queue.isFull()) {
Timber.d("remove task success, if there is a task, start the next task, taskId: %s",
task.getTaskId());
startNextTask(queue, task.getSchedulerType());
} else {
ALog.d(TAG, String.format("删除任务【%s】成功", task.getTaskName()));
Timber.d("remove task success, taskId: %s", task.getTaskId());
}
break;
case COMPLETE:
queue.removeTaskFormQueue(task.getKey());
ALog.d(TAG, String.format("任务【%s】处理完成", task.getTaskName()));
queue.removeTask(task);
Timber.d("task complete, taskId: %s", task.getTaskId());
startNextTask(queue, task.getSchedulerType());
break;
case FAIL:
@ -339,14 +341,6 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
if (what == FAIL || what == CHECK_FAIL) {
return;
}
if (what == CANCEL || what == COMPLETE) {
TaskWrapperManager.getInstance().removeTaskWrapper(task.getTaskWrapper());
} else {
if (what != RUNNING) {
TaskWrapperManager.getInstance().putTaskWrapper(task.getTaskWrapper());
}
}
normalTaskCallback(what, task);
}
@ -425,7 +419,8 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
}
}
private void normalTaskCallback(int state, TASK task, NormalTaskListenerInterface<TASK> listener) {
private void normalTaskCallback(int state, TASK task,
NormalTaskListenerInterface<TASK> listener) {
if (listener != null) {
if (task == null && state != ISchedulers.CHECK_FAIL) {
ALog.e(TAG, "TASK 为null,回调失败");
@ -518,22 +513,19 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
*
* @param task 下载任务
*/
private void handleFailTask(final ITaskQueue queue, final TASK task) {
if (!task.isNeedRetry() || task.isStop() || task.isCancel()) {
queue.removeTaskFormQueue(task.getKey());
private void handleFailTask(final ITaskQueue<ITask> queue, final TASK task) {
if (!task.getTaskState().getNeedRetry() || task.isStop() || task.isCancel()) {
queue.removeTask(task);
startNextTask(queue, task.getSchedulerType());
normalTaskCallback(FAIL, task);
return;
}
int num = task.getTaskWrapper().getConfig().getReTryNum();
boolean isNotNetRetry = mAriaConfig.getAConfig().isNotNetRetry();
if ((!NetUtils.isConnected(mAriaConfig.getAPP()) && !isNotNetRetry)
|| task.getTaskWrapper().getEntity().getFailNum() > num) {
queue.removeTaskFormQueue(task.getKey());
if ((!NetUtils.isConnected(mAriaConfig.getAPP()) && !isNotNetRetry)) {
queue.removeTask(task);
startNextTask(queue, task.getSchedulerType());
TaskWrapperManager.getInstance().removeTaskWrapper(task.getTaskWrapper());
normalTaskCallback(FAIL, task);
return;
}
@ -544,7 +536,7 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
/**
* 启动下一个任务条件任务停止取消下载任务完成
*/
void startNextTask(final ITaskQueue queue, int schedulerType) {
void startNextTask(final ITaskQueue<ITask> queue, int schedulerType) {
if (schedulerType == TaskSchedulerType.TYPE_STOP_NOT_NEXT) {
return;
}
@ -555,8 +547,12 @@ public class TaskSchedulers<TASK extends ITask> implements ISchedulers {
}
return;
}
if (newTask.getTaskState() == IEntity.STATE_WAIT) {
if (newTask.getTaskState().getState() == IEntity.STATE_WAIT) {
queue.startTask(newTask);
}
}
@Override public void init(@NonNull Context context) {
}
}

Loading…
Cancel
Save