From 69f10d68d1a018676acce6bdec83c5303dfdd543 Mon Sep 17 00:00:00 2001 From: laoyuyu <511455842@qq.com> Date: Sun, 19 Mar 2023 21:51:45 +0800 Subject: [PATCH] Subtask scheduling management --- .../aria/http/download/HttpDStopController.kt | 5 -- .../arialyy/dua/group/HttpDGOptionAdapter.kt | 5 ++ .../dua/group/HttpDGStartController.kt | 17 +++++ .../dua/group/HttpDGSubTaskInterceptor.kt | 2 +- .../arialyy/dua/group/HttpDGTaskManager.kt | 28 ++++++- .../arialyy/dua/group/HttpDGroupAdapter.kt | 2 +- .../com/arialyy/dua/group/HttpDGroupTask.java | 5 ++ .../arialyy/dua/group/HttpSubBlockManager.kt | 75 +++++++++++++++---- .../arialyy/aria/core/inf/ITaskAdapter.java | 3 +- .../com/arialyy/aria/core/task/AbsTask.java | 2 +- .../arialyy/aria/core/task/AbsTaskAdapter.kt | 6 +- 11 files changed, 120 insertions(+), 30 deletions(-) diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDStopController.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDStopController.kt index 66a1f06a..22b10565 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDStopController.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDStopController.kt @@ -15,14 +15,9 @@ */ package com.arialyy.aria.http.download -import android.net.Uri -import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.command.CancelCmd import com.arialyy.aria.core.command.StopCmd import com.arialyy.aria.core.task.TaskCachePool -import com.arialyy.aria.util.FileUtils -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.withContext import timber.log.Timber /** diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt index 4dc6f11c..d4bf25a9 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt @@ -26,4 +26,9 @@ internal class HttpDGOptionAdapter : IHttpTaskOptionAdapter { val subUrlList = mutableSetOf() val subNameList = mutableListOf() + /** + * Number of subtasks executed simultaneously + */ + var subTaskNum = 2 + } \ No newline at end of file diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt index efb88c20..bc6bc61b 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt @@ -51,6 +51,23 @@ class HttpDGStartController(target: Any, val savePath: Uri) : HttpBaseStartContr return super.setHttpOption(httpOption) as HttpDGStartController } + /** + * Number of subtasks executed simultaneously + * @param num max 16 + */ + fun setSubTaskNum(num: Int): HttpDGStartController { + if (num < 1) { + Timber.e("Quantity less than 1") + return this + } + if (num > 16) { + Timber.e("Quantity greater than 16") + return this + } + optionAdapter.subTaskNum = num + return this + } + /** * add sub task download uri */ diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt index 64f68907..02975486 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt @@ -72,7 +72,7 @@ internal class HttpDGSubTaskInterceptor : ITaskInterceptor { tp.eventListener = HttpSubListener() val subTask = SingleDownloadTask(tp) val subAdapter = HttpDTaskAdapter(true) - subAdapter.setBlockManager(HttpSubBlockManager(chain.blockManager.handler)) + subAdapter.setBlockManager(HttpSubBlockManager(subTask, chain.blockManager.handler)) subTask.adapter = subAdapter if (it.isComplete && checkTaskIsComplete(it)) { (chain.getTask() as HttpDGroupTask).addIncompleteTaskList(subTask) diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt index 52cdf59f..4fa13709 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt @@ -19,6 +19,14 @@ import android.os.Handler import android.os.Looper import com.arialyy.aria.core.inf.IBlockManager import com.arialyy.aria.core.inf.ITaskManager +import com.arialyy.aria.http.download.HttpDTaskAdapter +import kotlinx.coroutines.MainScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit.MILLISECONDS /** * @Author laoyuyu @@ -28,6 +36,14 @@ import com.arialyy.aria.core.inf.ITaskManager internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBlockManager { private lateinit var looper: Looper private lateinit var handler: Handler + private val subTaskNum = task.dgOptionAdapter.subTaskNum + private val threadPool = ThreadPoolExecutor( + subTaskNum, subTaskNum, + 0L, MILLISECONDS, + LinkedBlockingQueue(), + ) + private val dispatcher = threadPool.asCoroutineDispatcher() + private val scope = MainScope() private val callback = Handler.Callback { msg -> when (msg.what) { @@ -57,10 +73,20 @@ internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBloc override fun start() { task.incompleteTaskList.forEach { - + scope.launch(dispatcher) { + val adapter = HttpDTaskAdapter(true) + adapter.init(it) + adapter.start() + } } } + private fun quitLooper() { + looper.quit() + handler.removeCallbacksAndMessages(null) + scope.cancel() + } + override fun stop() { } diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt index e1f8fbe9..f4d428dc 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt @@ -35,7 +35,7 @@ import timber.log.Timber **/ internal class HttpDGroupAdapter : AbsTaskAdapter() { private val taskManager by lazy { - val manager = HttpDGTaskManager(getTask()) + val manager = HttpDGTaskManager(getTask() as HttpDGroupTask) ThreadTaskManager2.putTaskManager(getTask().taskId, manager) manager } diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java index cd38caa9..e6147e33 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java @@ -19,6 +19,7 @@ import com.arialyy.aria.core.common.TaskOption; import com.arialyy.aria.core.inf.ITaskOption; import com.arialyy.aria.core.task.AbsTask; import com.arialyy.aria.core.task.SingleDownloadTask; +import com.arialyy.aria.http.HttpTaskOption; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -37,6 +38,10 @@ public class HttpDGroupTask extends AbsTask { super(taskOption); } + public HttpDGOptionAdapter getDGOptionAdapter() { + return getTaskOption(HttpTaskOption.class).getOptionAdapter(HttpDGOptionAdapter.class); + } + void setIncompleteTaskList(List list) { incompleteTaskList.clear(); incompleteTaskList.addAll(list); diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubBlockManager.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubBlockManager.kt index bfe6158d..c8b30dc8 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubBlockManager.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubBlockManager.kt @@ -16,37 +16,84 @@ package com.arialyy.dua.group import android.os.Handler +import android.os.Looper import com.arialyy.aria.core.inf.IBlockManager -import com.arialyy.aria.core.task.IThreadTask -import com.arialyy.aria.orm.entity.BlockRecord +import com.arialyy.aria.core.inf.ITaskManager +import com.arialyy.aria.core.task.ITask +import com.arialyy.aria.http.HttpTaskOption +import com.arialyy.aria.http.download.HttpDOptionAdapter +import timber.log.Timber /** * @Author laoyuyu * @Description * @Date 10:04 2023/3/12 **/ -class HttpSubBlockManager(val handler: Handler) : IBlockManager { - private val unfinishedBlock = mutableListOf() - private var blockNum: Int = 1 +internal class HttpSubBlockManager(private val task: ITask, private val groupHandler: Handler) : + IBlockManager { + private lateinit var looper: Looper + private lateinit var handler: Handler - override fun putUnfinishedBlock(record: BlockRecord) { - unfinishedBlock.add(record) - } + private var isStop = false + private var isCancel = false - override fun getUnfinishedBlockList(): List { - return unfinishedBlock + /** + * Pass the message to the group task after the subtask is stopped + */ + private val callback = Handler.Callback { msg -> + when (msg.what) { + ITaskManager.STATE_STOP -> { + isStop = true + } + ITaskManager.STATE_CANCEL -> { + isCancel = true + } + ITaskManager.STATE_FAIL -> { + } + ITaskManager.STATE_COMPLETE -> { + } + ITaskManager.STATE_RUNNING -> { + } + ITaskManager.STATE_UPDATE_PROGRESS -> { + } + } + false } override fun getHandler() = handler - override fun start(threadTaskList: List) { - threadTaskList.forEach { tt -> - tt.run() + /** + * 1.Shared thread pool for subtasks [HttpDGTaskManager.dispatcher] + * 2.Subtasks support only single block downloads + */ + override fun start() { + if (Looper.myLooper() == Looper.getMainLooper()) { + throw IllegalThreadStateException("io operations cannot be in the main thread") } + looper = Looper.myLooper()!! + handler = Handler(looper, callback) + // Synchronized sequential execution of all block + task.getTaskOption(HttpTaskOption::class.java) + .getOptionAdapter(HttpDOptionAdapter::class.java).threadList.forEach { tt -> + if (isStop) { + Timber.d("task stopped") + return + } + if (isCancel) { + Timber.d("task canceled") + return + } + tt.run() + } + } + + private fun quitLooper() { + looper.quit() + handler.removeCallbacksAndMessages(null) } override fun setBlockNum(blockNum: Int) { - this.blockNum = blockNum + Timber.i("Subtasks do not support chunked downloads") } } \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskAdapter.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskAdapter.java index 4aac52f5..623ecb08 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskAdapter.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskAdapter.java @@ -16,7 +16,6 @@ package com.arialyy.aria.core.inf; -import com.arialyy.aria.core.listener.IEventListener; import com.arialyy.aria.core.task.ITask; import org.jetbrains.annotations.NotNull; @@ -26,7 +25,7 @@ import org.jetbrains.annotations.NotNull; */ public interface ITaskAdapter { - void init(@NotNull ITask task, @NotNull IEventListener listener); + void init(@NotNull ITask task); /** * 任务是否正在执行 diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTask.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTask.java index 643461f2..5b2c9962 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTask.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTask.java @@ -47,7 +47,7 @@ public abstract class AbsTask implements ITask { public void setAdapter(ITaskAdapter adapter) { mAdapter = adapter; - mAdapter.init(this, mTaskOption.eventListener); + mAdapter.init(this); } @Override public ITaskAdapter getAdapter() { diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt index 02de9f27..1f657bc8 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt @@ -17,8 +17,6 @@ package com.arialyy.aria.core.task import com.arialyy.aria.core.inf.IBlockManager import com.arialyy.aria.core.inf.ITaskAdapter -import com.arialyy.aria.core.inf.ITaskManager -import com.arialyy.aria.core.listener.IEventListener import com.arialyy.aria.core.task.ITaskInterceptor.IChain /** @@ -28,14 +26,12 @@ import com.arialyy.aria.core.task.ITaskInterceptor.IChain **/ abstract class AbsTaskAdapter : ITaskAdapter { private lateinit var mTask: ITask - private lateinit var mEventListener: IEventListener private val mUserInterceptor = mutableListOf() private val mCoreInterceptor = mutableListOf() - override fun init(task: ITask, listener: IEventListener) { + override fun init(task: ITask) { mTask = task - mEventListener = listener } fun getTask() = mTask