From 344cbd2d99a23db7060eca974ec683d4c11eeb2a Mon Sep 17 00:00:00 2001 From: laoyuyu <511455842@qq.com> Date: Mon, 13 Mar 2023 22:25:40 +0800 Subject: [PATCH] http group Task Manager --- .../com/arialyy/aria/http/HttpTaskOption.kt | 1 + .../java/com/arialyy/aria/http/SubState.kt | 25 +++ .../download/HttpBlockThreadInterceptor.kt | 12 +- .../http/download/HttpDBlockInterceptor.kt | 10 +- .../aria/http/download/HttpDBlockManager.kt | 57 ++++++ .../aria/http/download/HttpDOptionAdapter.kt | 17 ++ .../aria/http/download/SingleTaskDelegate.kt | 5 +- .../aria/http/download/SubTaskDelegate.kt | 43 ++++- .../aria/http/download/TimerInterceptor.kt | 2 +- .../aria/http/upload/HttpUTaskAdapter.kt | 162 +++++++++--------- .../arialyy/dua/group/HttpDGEventListener.kt | 2 +- .../arialyy/dua/group/HttpDGOptionAdapter.kt | 1 + .../dua/group/HttpDGStartController.kt | 6 +- .../dua/group/HttpDGSubTaskInterceptor.kt | 38 ++-- .../arialyy/dua/group/HttpDGTaskManager.kt | 20 ++- .../arialyy/dua/group/HttpDGroupAdapter.kt | 28 +-- ...HttpGroupTask.java => HttpDGroupTask.java} | 37 +++- .../com/arialyy/aria/core/inf/BaseEntity.kt | 2 + .../arialyy/aria/core/inf/IBlockManager.java | 9 +- .../com/arialyy/aria/core/inf/ITaskManager.kt | 2 - .../arialyy/aria/core/task/AbsTaskAdapter.kt | 1 + .../arialyy/aria/core/task/DBlockManager.kt | 44 +---- .../aria/core/task/ThreadTaskManager2.kt | 4 +- .../com/arialyy/aria/orm/entity/DEntity.kt | 9 + .../com/arialyy/aria/orm/entity/DGEntity.kt | 10 ++ 25 files changed, 360 insertions(+), 187 deletions(-) create mode 100644 Http/src/main/java/com/arialyy/aria/http/SubState.kt create mode 100644 Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockManager.kt rename HttpGroup/src/main/java/com/arialyy/dua/group/{HttpGroupTask.java => HttpDGroupTask.java} (56%) diff --git a/Http/src/main/java/com/arialyy/aria/http/HttpTaskOption.kt b/Http/src/main/java/com/arialyy/aria/http/HttpTaskOption.kt index 76fb630f..5540d1ec 100644 --- a/Http/src/main/java/com/arialyy/aria/http/HttpTaskOption.kt +++ b/Http/src/main/java/com/arialyy/aria/http/HttpTaskOption.kt @@ -17,6 +17,7 @@ package com.arialyy.aria.http import com.arialyy.aria.core.common.TaskOption import com.arialyy.aria.core.task.ITaskInterceptor +import com.arialyy.aria.orm.entity.BlockRecord /** * @Author laoyuyu diff --git a/Http/src/main/java/com/arialyy/aria/http/SubState.kt b/Http/src/main/java/com/arialyy/aria/http/SubState.kt new file mode 100644 index 00000000..a40c6ec6 --- /dev/null +++ b/Http/src/main/java/com/arialyy/aria/http/SubState.kt @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.http + +/** + * @Author laoyuyu + * @Description + * @Date 21:14 PM 2023/3/13 + **/ +data class SubState( + val subId: Int +) diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt index 8438e6a3..73ecf034 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt @@ -33,10 +33,13 @@ import com.arialyy.aria.orm.entity.BlockRecord **/ class HttpBlockThreadInterceptor : ITaskInterceptor { private lateinit var blockManager: IBlockManager + private lateinit var taskOption: HttpDOptionAdapter override suspend fun interceptor(chain: TaskChain): TaskResp { - blockManager = chain.blockManager as IBlockManager - val unfinishedBlockList = blockManager.unfinishedBlockList + blockManager = chain.blockManager + taskOption = chain.getTask().getTaskOption(HttpTaskOption::class.java) + .getOptionAdapter(HttpDOptionAdapter::class.java) + val unfinishedBlockList = taskOption.getUnfinishedBlockList() if (unfinishedBlockList.isEmpty()) { return TaskResp(TaskResp.CODE_INTERRUPT) } @@ -47,8 +50,8 @@ class HttpBlockThreadInterceptor : ITaskInterceptor { private fun createThreadTask(blockRecordList: List, chain: TaskChain) { val threadTaskList = mutableListOf() + val option = chain.getTask().getTaskOption(HttpTaskOption::class.java) blockRecordList.forEach { - val option = chain.getTask().getTaskOption(HttpTaskOption::class.java) val threadConfig = ThreadConfig(it, option, DuaContext.getDConfig().maxSpeed) threadTaskList.add( ThreadTask2( @@ -62,6 +65,7 @@ class HttpBlockThreadInterceptor : ITaskInterceptor { ) ) } - blockManager.start(threadTaskList) + option.getOptionAdapter(HttpDOptionAdapter::class.java).threadList = threadTaskList + blockManager.start() } } \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt index cccc38c3..eb16a5a7 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt @@ -41,14 +41,14 @@ import java.io.File */ internal class HttpDBlockInterceptor : ITaskInterceptor { private lateinit var task: ITask - private lateinit var option: HttpTaskOption + private lateinit var taskOption: HttpTaskOption private lateinit var blockManager: IBlockManager private lateinit var taskRecord: TaskRecord override suspend fun interceptor(chain: TaskChain): TaskResp { task = chain.getTask() - blockManager = chain.blockManager as IBlockManager - option = task.getTaskOption(HttpTaskOption::class.java) + blockManager = chain.blockManager + taskOption = task.getTaskOption(HttpTaskOption::class.java) if (task.taskState.fileSize < 0) { Timber.e("file size < 0") return TaskResp(TaskResp.CODE_INTERRUPT) @@ -123,7 +123,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor { * if block already exist, upload progress */ private suspend fun checkBlock(): Int { - val handler = blockManager.getHandler() + val handler = blockManager.handler val needUpdateBlockRecord = mutableSetOf() for (br in taskRecord.blockList) { val blockF = File(br.blockPath) @@ -136,7 +136,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor { if (br.curProgress != blockF.length()) { br.curProgress = blockF.length() needUpdateBlockRecord.add(br) - blockManager.putUnfinishedBlock(br) + taskOption.getOptionAdapter(HttpDOptionAdapter::class.java).putUnfinishedBlock(br) } // update task progress handler.obtainMessage(ITaskManager.STATE_UPDATE_PROGRESS, br.curProgress) diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockManager.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockManager.kt new file mode 100644 index 00000000..67cd6fc4 --- /dev/null +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockManager.kt @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.http.download + +import com.arialyy.aria.core.task.DBlockManager +import com.arialyy.aria.core.task.ITask +import com.arialyy.aria.http.HttpTaskOption +import kotlinx.coroutines.launch + +/** + * @Author laoyuyu + * @Description + * @Date 21:45 2023/3/13 + **/ +class HttpDBlockManager(task: ITask) : DBlockManager(task) { + private val optionAdapter = + task.getTaskOption(HttpTaskOption::class.java).getOptionAdapter(HttpDOptionAdapter::class.java) + + override fun start() { + optionAdapter.threadList.forEach { tt -> + scope.launch(dispatcher) { + tt.run() + } + } + } + + override fun stop() { + optionAdapter.threadList.forEach { + it.stop() + } + } + + override fun cancel() { + optionAdapter.threadList.forEach { + it.cancel() + } + } + + override fun quitLooper() { + super.quitLooper() + optionAdapter.threadList.clear() + } + +} \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDOptionAdapter.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDOptionAdapter.kt index c5707529..e2df9861 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDOptionAdapter.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDOptionAdapter.kt @@ -16,7 +16,9 @@ package com.arialyy.aria.http.download import com.arialyy.aria.core.processor.IHttpFileLenAdapter +import com.arialyy.aria.core.task.IThreadTask import com.arialyy.aria.http.IHttpTaskOptionAdapter +import com.arialyy.aria.orm.entity.BlockRecord /** * @Author laoyuyu @@ -41,4 +43,19 @@ class HttpDOptionAdapter : IHttpTaskOptionAdapter { var fileName: String? = null + private val unfinishedBlock = mutableListOf() + var threadList = mutableListOf() + set(value) { + field.clear() + field.addAll(value) + } + + fun putUnfinishedBlock(record: BlockRecord) { + unfinishedBlock.add(record) + } + + fun getUnfinishedBlockList(): List { + return unfinishedBlock + } + } \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/SingleTaskDelegate.kt b/Http/src/main/java/com/arialyy/aria/http/download/SingleTaskDelegate.kt index 2cc2104c..d67c42a6 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/SingleTaskDelegate.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/SingleTaskDelegate.kt @@ -18,7 +18,6 @@ package com.arialyy.aria.http.download import android.os.Looper import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.inf.IBlockManager -import com.arialyy.aria.core.task.DBlockManager import com.arialyy.aria.core.task.TaskResp import com.arialyy.aria.core.task.ThreadTaskManager2 import com.arialyy.aria.exception.AriaException @@ -33,10 +32,10 @@ import timber.log.Timber * @Date 11:32 2023/3/12 **/ internal class SingleTaskDelegate(val adapter: HttpDTaskAdapter) : ITaskAdapterDelegate { - private val blockManager = DBlockManager(adapter.getTask()) + private val blockManager = HttpDBlockManager(adapter.getTask()) init { - ThreadTaskManager2.putThreadManager(adapter.getTask().taskId, blockManager) + ThreadTaskManager2.putTaskManager(adapter.getTask().taskId, blockManager) } override fun isRunning(): Boolean { diff --git a/Http/src/main/java/com/arialyy/aria/http/download/SubTaskDelegate.kt b/Http/src/main/java/com/arialyy/aria/http/download/SubTaskDelegate.kt index 452cceef..89309af5 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/SubTaskDelegate.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/SubTaskDelegate.kt @@ -15,7 +15,17 @@ */ package com.arialyy.aria.http.download +import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.inf.IBlockManager +import com.arialyy.aria.core.inf.ITaskManager +import com.arialyy.aria.core.task.TaskResp +import com.arialyy.aria.core.task.ThreadTaskManager2 +import com.arialyy.aria.exception.AriaException +import com.arialyy.aria.http.HttpTaskOption +import com.arialyy.aria.http.SubState +import com.arialyy.aria.orm.entity.DEntity +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch /** * @Author laoyuyu @@ -26,19 +36,44 @@ internal class SubTaskDelegate(val adapter: HttpDTaskAdapter) : ITaskAdapterDele private lateinit var blockManager: IBlockManager override fun isRunning(): Boolean { - TODO("Not yet implemented") + return ThreadTaskManager2.taskIsRunning(adapter.getTask().taskId) } override fun cancel() { - TODO("Not yet implemented") + sendMsg(ITaskManager.STATE_CANCEL) } override fun stop() { - TODO("Not yet implemented") + sendMsg(ITaskManager.STATE_STOP) } override fun start() { - TODO("Not yet implemented") + DuaContext.duaScope.launch(Dispatchers.IO) { + adapter.addCoreInterceptor(HttpDCheckInterceptor()) + adapter.addCoreInterceptor(TimerInterceptor()) + adapter.addCoreInterceptor(HttpDHeaderInterceptor()) + adapter.addCoreInterceptor(HttpDBlockInterceptor()) + adapter.addCoreInterceptor(HttpBlockThreadInterceptor()) + val resp = adapter.interceptor() + if (resp == null || resp.code != TaskResp.CODE_SUCCESS) { + adapter.getTask().getTaskOption(HttpTaskOption::class.java).eventListener.onFail( + false, + AriaException("start task fail, task interrupt, code: ${resp?.code ?: TaskResp.CODE_INTERRUPT}") + ) + sendMsg(ITaskManager.STATE_STOP) + return@launch + } + } + } + + /** + * @param state [ITaskManager] + */ + private fun sendMsg(state: Int) { + blockManager.handler.obtainMessage( + state, + SubState(adapter.getTask().taskState.getEntity(DEntity::class.java).did) + ).sendToTarget() } override fun setBlockManager(blockManager: IBlockManager) { diff --git a/Http/src/main/java/com/arialyy/aria/http/download/TimerInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/TimerInterceptor.kt index c7099d92..d92c08c0 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/TimerInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/TimerInterceptor.kt @@ -51,7 +51,7 @@ open class TimerInterceptor : ITaskInterceptor { closeTimer() try { mTimer = ScheduledThreadPoolExecutor(1) - val threadManager = ThreadTaskManager2.getThreadManager(chain.getTask().taskId) + val threadManager = ThreadTaskManager2.getTaskManager(chain.getTask().taskId) if (threadManager == null) { Timber.e("thread manager is null, start timer fail") return false diff --git a/Http/src/main/java/com/arialyy/aria/http/upload/HttpUTaskAdapter.kt b/Http/src/main/java/com/arialyy/aria/http/upload/HttpUTaskAdapter.kt index ca1f0ce8..85dc1094 100644 --- a/Http/src/main/java/com/arialyy/aria/http/upload/HttpUTaskAdapter.kt +++ b/Http/src/main/java/com/arialyy/aria/http/upload/HttpUTaskAdapter.kt @@ -1,81 +1,81 @@ -/* - * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.arialyy.aria.http.upload - -import android.os.Looper -import com.arialyy.aria.core.DuaContext -import com.arialyy.aria.core.inf.ITaskManager -import com.arialyy.aria.core.task.AbsTaskAdapter -import com.arialyy.aria.core.task.DBlockManager -import com.arialyy.aria.core.task.TaskResp -import com.arialyy.aria.exception.AriaException -import com.arialyy.aria.http.HttpTaskOption -import com.arialyy.aria.http.download.HttpBlockThreadInterceptor -import com.arialyy.aria.http.download.TimerInterceptor -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch - -/** - * @Author laoyuyu - * @Description - * @Date 9:21 PM 2023/2/21 - **/ -class HttpUTaskAdapter : AbsTaskAdapter() { - private var blockManager: DBlockManager? = null - - override fun getBlockManager(): ITaskManager { - if (blockManager == null) { - blockManager = DBlockManager(getTask()) - } - return blockManager!! - } - - override fun isRunning(): Boolean { - return blockManager?.isRunning ?: false - } - - override fun cancel() { - TODO("Not yet implemented") - } - - override fun stop() { - TODO("Not yet implemented") - } - - override fun start() { - getTask().getTaskOption(HttpTaskOption::class.java).taskInterceptor.let { - if (it.isNotEmpty()) { - addInterceptors(it) - } - } - DuaContext.duaScope.launch(Dispatchers.IO) { - Looper.prepare() - addCoreInterceptor(TimerInterceptor()) - addCoreInterceptor(HttpUBlockInterceptor()) - addCoreInterceptor(HttpBlockThreadInterceptor()) - val resp = interceptor() - if (resp == null || resp.code != TaskResp.CODE_SUCCESS) { - getTask().getTaskOption(HttpTaskOption::class.java).eventListener.onFail( - false, - AriaException("start task fail, task interrupt, code: ${resp?.code ?: TaskResp.CODE_INTERRUPT}") - ) - blockManager?.stop() - return@launch - } - Looper.loop() - } - } -} \ No newline at end of file +///* +// * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package com.arialyy.aria.http.upload +// +//import android.os.Looper +//import com.arialyy.aria.core.DuaContext +//import com.arialyy.aria.core.inf.ITaskManager +//import com.arialyy.aria.core.task.AbsTaskAdapter +//import com.arialyy.aria.core.task.DBlockManager +//import com.arialyy.aria.core.task.TaskResp +//import com.arialyy.aria.exception.AriaException +//import com.arialyy.aria.http.HttpTaskOption +//import com.arialyy.aria.http.download.HttpBlockThreadInterceptor +//import com.arialyy.aria.http.download.TimerInterceptor +//import kotlinx.coroutines.Dispatchers +//import kotlinx.coroutines.launch +// +///** +// * @Author laoyuyu +// * @Description +// * @Date 9:21 PM 2023/2/21 +// **/ +//class HttpUTaskAdapter : AbsTaskAdapter() { +// private var blockManager: DBlockManager? = null +// +// override fun getBlockManager(): ITaskManager { +// if (blockManager == null) { +// blockManager = DBlockManager(getTask()) +// } +// return blockManager!! +// } +// +// override fun isRunning(): Boolean { +// return blockManager?.isRunning ?: false +// } +// +// override fun cancel() { +// TODO("Not yet implemented") +// } +// +// override fun stop() { +// TODO("Not yet implemented") +// } +// +// override fun start() { +// getTask().getTaskOption(HttpTaskOption::class.java).taskInterceptor.let { +// if (it.isNotEmpty()) { +// addInterceptors(it) +// } +// } +// DuaContext.duaScope.launch(Dispatchers.IO) { +// Looper.prepare() +// addCoreInterceptor(TimerInterceptor()) +// addCoreInterceptor(HttpUBlockInterceptor()) +// addCoreInterceptor(HttpBlockThreadInterceptor()) +// val resp = interceptor() +// if (resp == null || resp.code != TaskResp.CODE_SUCCESS) { +// getTask().getTaskOption(HttpTaskOption::class.java).eventListener.onFail( +// false, +// AriaException("start task fail, task interrupt, code: ${resp?.code ?: TaskResp.CODE_INTERRUPT}") +// ) +// blockManager?.stop() +// return@launch +// } +// Looper.loop() +// } +// } +//} \ No newline at end of file diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGEventListener.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGEventListener.kt index 0ee66879..2b21ffff 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGEventListener.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGEventListener.kt @@ -22,7 +22,7 @@ import com.arialyy.aria.core.listener.AbsEventListener * @Description * @Date 9:15 PM 2023/3/6 **/ -internal class HttpDGEventListener(task: HttpGroupTask) : AbsEventListener(task) { +internal class HttpDGEventListener(task: HttpDGroupTask) : AbsEventListener(task) { override fun onComplete() { TODO("Not yet implemented") } diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt index 5301de16..4dc6f11c 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt @@ -25,4 +25,5 @@ import com.arialyy.aria.http.IHttpTaskOptionAdapter internal class HttpDGOptionAdapter : IHttpTaskOptionAdapter { val subUrlList = mutableSetOf() val subNameList = mutableListOf() + } \ No newline at end of file diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt index ad6d50f6..efb88c20 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt @@ -67,15 +67,15 @@ class HttpDGStartController(target: Any, val savePath: Uri) : HttpBaseStartContr return this } - private fun getTask(createNewTask: Boolean = true): HttpGroupTask { + private fun getTask(createNewTask: Boolean = true): HttpDGroupTask { if (HttpUtil.checkHttpDParams(httpTaskOption)) { throw IllegalArgumentException("invalid params") } val temp = TaskCachePool.getTaskByKey(savePath.toString()) if (temp != null) { - return temp as HttpGroupTask + return temp as HttpDGroupTask } - val task = HttpGroupTask(httpTaskOption) + val task = HttpDGroupTask(httpTaskOption) task.adapter = HttpDGroupAdapter() TaskCachePool.putTask(task) return task diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt index 1f700995..64f68907 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt @@ -16,14 +16,19 @@ package com.arialyy.dua.group import com.arialyy.aria.core.common.TaskOption +import com.arialyy.aria.core.inf.IBlockManager +import com.arialyy.aria.core.inf.ITaskManager import com.arialyy.aria.core.task.ITask import com.arialyy.aria.core.task.ITaskInterceptor import com.arialyy.aria.core.task.SingleDownloadTask import com.arialyy.aria.core.task.TaskChain import com.arialyy.aria.core.task.TaskResp import com.arialyy.aria.http.HttpTaskOption +import com.arialyy.aria.http.SubState import com.arialyy.aria.http.download.HttpDTaskAdapter +import com.arialyy.aria.orm.entity.DEntity import com.arialyy.aria.orm.entity.DGEntity +import timber.log.Timber /** * Subtasks do not support chunking @@ -33,27 +38,32 @@ import com.arialyy.aria.orm.entity.DGEntity **/ internal class HttpDGSubTaskInterceptor : ITaskInterceptor { private lateinit var task: ITask - private lateinit var optionAdapter: HttpDGOptionAdapter private lateinit var taskOption: HttpTaskOption + private lateinit var blockManager: IBlockManager override suspend fun interceptor(chain: TaskChain): TaskResp { task = chain.getTask() + blockManager = chain.blockManager taskOption = task.getTaskOption(HttpTaskOption::class.java) - optionAdapter = taskOption.getOptionAdapter(HttpDGOptionAdapter::class.java) - val subList = createSubTask(chain) - - + createSubTask(chain) + blockManager.start() return TaskResp(TaskResp.CODE_SUCCESS) } - private fun startSubTask(subList: List) { - subList.forEach { - + /** + * 1. file exist + * 2. correct file length + * 3. send complete msg + */ + private fun checkTaskIsComplete(entity: DEntity): Boolean { + if (entity.fileIsComplete()) { + blockManager.handler.obtainMessage(ITaskManager.STATE_COMPLETE, SubState(entity.did)) + return true } + return false } - private fun createSubTask(chain: TaskChain): List { - val subTaskList = mutableListOf() + private fun createSubTask(chain: TaskChain) { task.taskState.getEntity(DGEntity::class.java).subList.forEach { val tp = TaskOption() tp.sourUrl = it.sourceUrl @@ -63,11 +73,13 @@ internal class HttpDGSubTaskInterceptor : ITaskInterceptor { val subTask = SingleDownloadTask(tp) val subAdapter = HttpDTaskAdapter(true) subAdapter.setBlockManager(HttpSubBlockManager(chain.blockManager.handler)) - subTask.adapter = subAdapter - subTaskList.add(subTask) + if (it.isComplete && checkTaskIsComplete(it)) { + (chain.getTask() as HttpDGroupTask).addIncompleteTaskList(subTask) + Timber.d("task already complete, sourUrl: ${it.sourceUrl}") + } + (chain.getTask() as HttpDGroupTask).addSubTask(subTask) } - return subTaskList } } \ No newline at end of file diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt index c05f6655..52cdf59f 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt @@ -17,15 +17,15 @@ package com.arialyy.dua.group import android.os.Handler import android.os.Looper +import com.arialyy.aria.core.inf.IBlockManager import com.arialyy.aria.core.inf.ITaskManager -import com.arialyy.aria.http.HttpTaskOption /** * @Author laoyuyu * @Description * @Date 7:43 PM 2023/3/7 **/ -internal class HttpDGTaskManager : ITaskManager { +internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBlockManager { private lateinit var looper: Looper private lateinit var handler: Handler @@ -47,10 +47,6 @@ internal class HttpDGTaskManager : ITaskManager { false } - fun start(taskOption: HttpTaskOption) { - taskOption.getOptionAdapter(HttpDGOptionAdapter::class.java).subUrlList - } - override fun setLooper() { if (Looper.myLooper() == Looper.getMainLooper()) { throw IllegalThreadStateException("io operations cannot be in the main thread") @@ -59,6 +55,12 @@ internal class HttpDGTaskManager : ITaskManager { handler = Handler(looper, callback) } + override fun start() { + task.incompleteTaskList.forEach { + + } + } + override fun stop() { } @@ -86,10 +88,12 @@ internal class HttpDGTaskManager : ITaskManager { TODO("Not yet implemented") } - fun getHandler(): Handler { - return handler + override fun setBlockNum(blockNum: Int) { + TODO("Not yet implemented") } + override fun getHandler(): Handler = handler + override fun hasFailedTask(): Boolean { TODO("Not yet implemented") } diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt index 815d1c12..e1f8fbe9 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt @@ -18,9 +18,9 @@ package com.arialyy.dua.group import android.os.Looper import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.inf.IBlockManager -import com.arialyy.aria.core.inf.ITaskManager import com.arialyy.aria.core.task.AbsTaskAdapter import com.arialyy.aria.core.task.TaskResp +import com.arialyy.aria.core.task.ThreadTaskManager2 import com.arialyy.aria.exception.AriaException import com.arialyy.aria.http.HttpTaskOption import com.arialyy.aria.http.download.TimerInterceptor @@ -35,12 +35,14 @@ import timber.log.Timber **/ internal class HttpDGroupAdapter : AbsTaskAdapter() { private val taskManager by lazy { - HttpDGTaskManager() + val manager = HttpDGTaskManager(getTask()) + ThreadTaskManager2.putTaskManager(getTask().taskId, manager) + manager } init { getTask().getTaskOption(HttpTaskOption::class.java).eventListener = - HttpDGEventListener(getTask() as HttpGroupTask) + HttpDGEventListener(getTask() as HttpDGroupTask) } override fun getBlockManager(): IBlockManager { @@ -48,20 +50,26 @@ internal class HttpDGroupAdapter : AbsTaskAdapter() { } override fun isRunning(): Boolean { - return taskManager.isRunning() + return ThreadTaskManager2.getTaskManager(getTask().taskId)?.isRunning() == true } override fun cancel() { - if (getBlockManager().isCanceled()) { - Timber.w("task already canceled, taskId: ${getTask().taskId}") - return + ThreadTaskManager2.getTaskManager(getTask().taskId)?.let { + if (it.isCanceled()) { + Timber.w("task already canceled, taskId: ${getTask().taskId}") + return + } + it.cancel() } } override fun stop() { - if (getBlockManager().isStopped()) { - Timber.w("task already stopped, taskId: ${getTask().taskId}") - return + ThreadTaskManager2.getTaskManager(getTask().taskId)?.let { + if (it.isStopped()) { + Timber.w("task already stopped, taskId: ${getTask().taskId}") + return + } + it.stop() } } diff --git a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpGroupTask.java b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java similarity index 56% rename from HttpGroup/src/main/java/com/arialyy/dua/group/HttpGroupTask.java rename to HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java index c77af7e3..cd38caa9 100644 --- a/HttpGroup/src/main/java/com/arialyy/dua/group/HttpGroupTask.java +++ b/HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java @@ -18,18 +18,51 @@ package com.arialyy.dua.group; import com.arialyy.aria.core.common.TaskOption; import com.arialyy.aria.core.inf.ITaskOption; import com.arialyy.aria.core.task.AbsTask; +import com.arialyy.aria.core.task.SingleDownloadTask; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; /** * Created by AriaL on 2017/6/27. * 任务组任务 */ -public class HttpGroupTask extends AbsTask { +public class HttpDGroupTask extends AbsTask { - public HttpGroupTask(ITaskOption taskOption) { + private List incompleteTaskList = new ArrayList<>(); + + private List subTaskList = new ArrayList<>(); + + public HttpDGroupTask(ITaskOption taskOption) { super(taskOption); } + void setIncompleteTaskList(List list) { + incompleteTaskList.clear(); + incompleteTaskList.addAll(list); + } + + void addIncompleteTaskList(SingleDownloadTask task) { + incompleteTaskList.add(task); + } + + List getIncompleteTaskList() { + return incompleteTaskList; + } + + void setSubTaskList(List list) { + this.subTaskList.clear(); + this.subTaskList.addAll(list); + } + + void addSubTask(SingleDownloadTask task) { + this.subTaskList.add(task); + } + + public List getSubTaskList() { + return subTaskList; + } + @Override public int getTaskType() { return HTTP_GROUP; } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/BaseEntity.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/BaseEntity.kt index bd327c66..cf527e43 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/BaseEntity.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/BaseEntity.kt @@ -41,4 +41,6 @@ abstract class BaseEntity : IEntity { var ext: String? = null val fileSize: Long = 0 + + var isComplete: Boolean = false } \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java index b03c3092..60f0a3dd 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java @@ -16,23 +16,16 @@ package com.arialyy.aria.core.inf; import android.os.Handler; -import com.arialyy.aria.core.task.IThreadTask; -import com.arialyy.aria.orm.entity.BlockRecord; -import java.util.List; /** * 线程任务状态 */ public interface IBlockManager { - void start(List threadTaskList); + void start(); void setBlockNum(int blockNum); - void putUnfinishedBlock(BlockRecord record); - - List getUnfinishedBlockList(); - /** * 创建handler 回调 */ diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskManager.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskManager.kt index e86a5e92..b8aa6c95 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskManager.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskManager.kt @@ -16,8 +16,6 @@ package com.arialyy.aria.core.inf -import android.os.Handler - /** * @Author laoyuyu * @Description diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt index f8700d34..02de9f27 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt @@ -17,6 +17,7 @@ package com.arialyy.aria.core.task import com.arialyy.aria.core.inf.IBlockManager import com.arialyy.aria.core.inf.ITaskAdapter +import com.arialyy.aria.core.inf.ITaskManager import com.arialyy.aria.core.listener.IEventListener import com.arialyy.aria.core.task.ITaskInterceptor.IChain diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt index 3a994df1..62c55662 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt @@ -23,33 +23,29 @@ import com.arialyy.aria.core.inf.ITaskManager import com.arialyy.aria.core.inf.ITaskOption import com.arialyy.aria.core.listener.IEventListener import com.arialyy.aria.exception.AriaException -import com.arialyy.aria.orm.entity.BlockRecord import kotlinx.coroutines.MainScope import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.cancel import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch import timber.log.Timber import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicInteger -class DBlockManager(val task: ITask) : IBlockManager, ITaskManager { - private val unfinishedBlock = mutableListOf() +abstract class DBlockManager(val task: ITask) : IBlockManager, ITaskManager { private val canceledNum = AtomicInteger(0) // 已经取消的线程的数 private val stoppedNum = AtomicInteger(0) // 已经停止的线程数 private val failedNum = AtomicInteger(0) // 失败的线程数 private val completedNum = AtomicInteger(0) // 完成的线程数 private val threadNum = task.getTaskOption(ITaskOption::class.java).threadNum - private val scope = MainScope() + protected val scope = MainScope() private val threadPool = ThreadPoolExecutor( threadNum, threadNum, 0L, MILLISECONDS, LinkedBlockingQueue(), ) - private val dispatcher = threadPool.asCoroutineDispatcher() - private val threadTaskList = mutableListOf() + protected val dispatcher = threadPool.asCoroutineDispatcher() private var progress: Long = 0 //当前总进度 private lateinit var looper: Looper @@ -110,20 +106,10 @@ class DBlockManager(val task: ITask) : IBlockManager, ITaskManager { /** * 退出looper循环 */ - private fun quitLooper() { + protected open fun quitLooper() { looper.quit() handler.removeCallbacksAndMessages(null) scope.cancel() - threadTaskList.clear() - unfinishedBlock.clear() - } - - override fun putUnfinishedBlock(record: BlockRecord) { - unfinishedBlock.add(record) - } - - override fun getUnfinishedBlockList(): List { - return unfinishedBlock } override fun setLooper() { @@ -134,28 +120,6 @@ class DBlockManager(val task: ITask) : IBlockManager, ITaskManager { handler = Handler(looper, callback) } - override fun start(threadTaskList: List) { - this.threadTaskList.clear() - this.threadTaskList.addAll(threadTaskList) - threadTaskList.forEach { tt -> - scope.launch(dispatcher) { - tt.run() - } - } - } - - override fun stop() { - threadTaskList.forEach { - it.stop() - } - } - - override fun cancel() { - threadTaskList.forEach { - it.cancel() - } - } - override fun setBlockNum(blockNum: Int) { this.blockNum = blockNum } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTaskManager2.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTaskManager2.kt index c48bfdc7..3f55c958 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTaskManager2.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTaskManager2.kt @@ -22,9 +22,9 @@ import java.util.concurrent.ConcurrentHashMap object ThreadTaskManager2 { private val taskManagerMap: ConcurrentHashMap = ConcurrentHashMap() - fun getThreadManager(taskId: Int) = taskManagerMap[taskId] + fun getTaskManager(taskId: Int) = taskManagerMap[taskId] - fun putThreadManager(taskId: Int, taskManager: ITaskManager) { + fun putTaskManager(taskId: Int, taskManager: ITaskManager) { taskManagerMap[taskId] = taskManager } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DEntity.kt b/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DEntity.kt index fc75cd48..b0002300 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DEntity.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DEntity.kt @@ -56,6 +56,15 @@ data class DEntity( ) : BaseEntity() { private var dirFile: File? = null + /** + * 1. file exist + * 2. correct file length + */ + fun fileIsComplete(): Boolean { + val f = getFilePath() + return f.exists() && f.length() == fileSize + } + fun getFilePath(): File { if (dirFile == null) { dirFile = File(FileUri.getPathByUri(savePath)!!) diff --git a/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGEntity.kt b/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGEntity.kt index fe1826b6..ce71b882 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGEntity.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGEntity.kt @@ -21,7 +21,10 @@ import androidx.room.Ignore import androidx.room.Index import androidx.room.PrimaryKey import androidx.room.TypeConverters +import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.inf.BaseEntity +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import kotlinx.parcelize.Parcelize /** @@ -55,4 +58,11 @@ data class DGEntity( ) : BaseEntity() { @Ignore var subList: MutableList = mutableListOf() + override fun update() { + updateTime = System.currentTimeMillis() + DuaContext.duaScope.launch(Dispatchers.IO) { + DuaContext.getServiceManager().getDbService().getDuaDb().getDGEntityDao() + .update(this@DGEntity) + } + } } \ No newline at end of file