Subtask scheduling management

v4
laoyuyu 2 years ago
parent 344cbd2d99
commit 69f10d68d1
  1. 5
      Http/src/main/java/com/arialyy/aria/http/download/HttpDStopController.kt
  2. 5
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt
  3. 17
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt
  4. 2
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt
  5. 28
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt
  6. 2
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt
  7. 5
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java
  8. 73
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpSubBlockManager.kt
  9. 3
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskAdapter.java
  10. 2
      PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTask.java
  11. 6
      PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt

@ -15,14 +15,9 @@
*/
package com.arialyy.aria.http.download
import android.net.Uri
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.command.CancelCmd
import com.arialyy.aria.core.command.StopCmd
import com.arialyy.aria.core.task.TaskCachePool
import com.arialyy.aria.util.FileUtils
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import timber.log.Timber
/**

@ -26,4 +26,9 @@ internal class HttpDGOptionAdapter : IHttpTaskOptionAdapter {
val subUrlList = mutableSetOf<String>()
val subNameList = mutableListOf<String>()
/**
* Number of subtasks executed simultaneously
*/
var subTaskNum = 2
}

@ -51,6 +51,23 @@ class HttpDGStartController(target: Any, val savePath: Uri) : HttpBaseStartContr
return super.setHttpOption(httpOption) as HttpDGStartController
}
/**
* Number of subtasks executed simultaneously
* @param num max 16
*/
fun setSubTaskNum(num: Int): HttpDGStartController {
if (num < 1) {
Timber.e("Quantity less than 1")
return this
}
if (num > 16) {
Timber.e("Quantity greater than 16")
return this
}
optionAdapter.subTaskNum = num
return this
}
/**
* add sub task download uri
*/

@ -72,7 +72,7 @@ internal class HttpDGSubTaskInterceptor : ITaskInterceptor {
tp.eventListener = HttpSubListener()
val subTask = SingleDownloadTask(tp)
val subAdapter = HttpDTaskAdapter(true)
subAdapter.setBlockManager(HttpSubBlockManager(chain.blockManager.handler))
subAdapter.setBlockManager(HttpSubBlockManager(subTask, chain.blockManager.handler))
subTask.adapter = subAdapter
if (it.isComplete && checkTaskIsComplete(it)) {
(chain.getTask() as HttpDGroupTask).addIncompleteTaskList(subTask)

@ -19,6 +19,14 @@ import android.os.Handler
import android.os.Looper
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.http.download.HttpDTaskAdapter
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit.MILLISECONDS
/**
* @Author laoyuyu
@ -28,6 +36,14 @@ import com.arialyy.aria.core.inf.ITaskManager
internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBlockManager {
private lateinit var looper: Looper
private lateinit var handler: Handler
private val subTaskNum = task.dgOptionAdapter.subTaskNum
private val threadPool = ThreadPoolExecutor(
subTaskNum, subTaskNum,
0L, MILLISECONDS,
LinkedBlockingQueue(),
)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val scope = MainScope()
private val callback = Handler.Callback { msg ->
when (msg.what) {
@ -57,10 +73,20 @@ internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBloc
override fun start() {
task.incompleteTaskList.forEach {
scope.launch(dispatcher) {
val adapter = HttpDTaskAdapter(true)
adapter.init(it)
adapter.start()
}
}
}
private fun quitLooper() {
looper.quit()
handler.removeCallbacksAndMessages(null)
scope.cancel()
}
override fun stop() {
}

@ -35,7 +35,7 @@ import timber.log.Timber
**/
internal class HttpDGroupAdapter : AbsTaskAdapter() {
private val taskManager by lazy {
val manager = HttpDGTaskManager(getTask())
val manager = HttpDGTaskManager(getTask() as HttpDGroupTask)
ThreadTaskManager2.putTaskManager(getTask().taskId, manager)
manager
}

@ -19,6 +19,7 @@ import com.arialyy.aria.core.common.TaskOption;
import com.arialyy.aria.core.inf.ITaskOption;
import com.arialyy.aria.core.task.AbsTask;
import com.arialyy.aria.core.task.SingleDownloadTask;
import com.arialyy.aria.http.HttpTaskOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -37,6 +38,10 @@ public class HttpDGroupTask extends AbsTask {
super(taskOption);
}
public HttpDGOptionAdapter getDGOptionAdapter() {
return getTaskOption(HttpTaskOption.class).getOptionAdapter(HttpDGOptionAdapter.class);
}
void setIncompleteTaskList(List<SingleDownloadTask> list) {
incompleteTaskList.clear();
incompleteTaskList.addAll(list);

@ -16,37 +16,84 @@
package com.arialyy.dua.group
import android.os.Handler
import android.os.Looper
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.task.IThreadTask
import com.arialyy.aria.orm.entity.BlockRecord
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.download.HttpDOptionAdapter
import timber.log.Timber
/**
* @Author laoyuyu
* @Description
* @Date 10:04 2023/3/12
**/
class HttpSubBlockManager(val handler: Handler) : IBlockManager {
private val unfinishedBlock = mutableListOf<BlockRecord>()
private var blockNum: Int = 1
internal class HttpSubBlockManager(private val task: ITask, private val groupHandler: Handler) :
IBlockManager {
private lateinit var looper: Looper
private lateinit var handler: Handler
override fun putUnfinishedBlock(record: BlockRecord) {
unfinishedBlock.add(record)
}
private var isStop = false
private var isCancel = false
override fun getUnfinishedBlockList(): List<BlockRecord> {
return unfinishedBlock
/**
* Pass the message to the group task after the subtask is stopped
*/
private val callback = Handler.Callback { msg ->
when (msg.what) {
ITaskManager.STATE_STOP -> {
isStop = true
}
ITaskManager.STATE_CANCEL -> {
isCancel = true
}
ITaskManager.STATE_FAIL -> {
}
ITaskManager.STATE_COMPLETE -> {
}
ITaskManager.STATE_RUNNING -> {
}
ITaskManager.STATE_UPDATE_PROGRESS -> {
}
}
false
}
override fun getHandler() = handler
override fun start(threadTaskList: List<IThreadTask>) {
threadTaskList.forEach { tt ->
/**
* 1.Shared thread pool for subtasks [HttpDGTaskManager.dispatcher]
* 2.Subtasks support only single block downloads
*/
override fun start() {
if (Looper.myLooper() == Looper.getMainLooper()) {
throw IllegalThreadStateException("io operations cannot be in the main thread")
}
looper = Looper.myLooper()!!
handler = Handler(looper, callback)
// Synchronized sequential execution of all block
task.getTaskOption(HttpTaskOption::class.java)
.getOptionAdapter(HttpDOptionAdapter::class.java).threadList.forEach { tt ->
if (isStop) {
Timber.d("task stopped")
return
}
if (isCancel) {
Timber.d("task canceled")
return
}
tt.run()
}
}
private fun quitLooper() {
looper.quit()
handler.removeCallbacksAndMessages(null)
}
override fun setBlockNum(blockNum: Int) {
this.blockNum = blockNum
Timber.i("Subtasks do not support chunked downloads")
}
}

@ -16,7 +16,6 @@
package com.arialyy.aria.core.inf;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.task.ITask;
import org.jetbrains.annotations.NotNull;
@ -26,7 +25,7 @@ import org.jetbrains.annotations.NotNull;
*/
public interface ITaskAdapter {
void init(@NotNull ITask task, @NotNull IEventListener listener);
void init(@NotNull ITask task);
/**
* 任务是否正在执行

@ -47,7 +47,7 @@ public abstract class AbsTask implements ITask {
public void setAdapter(ITaskAdapter adapter) {
mAdapter = adapter;
mAdapter.init(this, mTaskOption.eventListener);
mAdapter.init(this);
}
@Override public ITaskAdapter getAdapter() {

@ -17,8 +17,6 @@ package com.arialyy.aria.core.task
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskAdapter
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.core.task.ITaskInterceptor.IChain
/**
@ -28,14 +26,12 @@ import com.arialyy.aria.core.task.ITaskInterceptor.IChain
**/
abstract class AbsTaskAdapter : ITaskAdapter {
private lateinit var mTask: ITask
private lateinit var mEventListener: IEventListener
private val mUserInterceptor = mutableListOf<ITaskInterceptor>()
private val mCoreInterceptor = mutableListOf<ITaskInterceptor>()
override fun init(task: ITask, listener: IEventListener) {
override fun init(task: ITask) {
mTask = task
mEventListener = listener
}
fun getTask() = mTask

Loading…
Cancel
Save