http group Task Manager

v4
laoyuyu 2 years ago
parent e0c9b0c373
commit 344cbd2d99
  1. 1
      Http/src/main/java/com/arialyy/aria/http/HttpTaskOption.kt
  2. 25
      Http/src/main/java/com/arialyy/aria/http/SubState.kt
  3. 12
      Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt
  4. 10
      Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt
  5. 57
      Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockManager.kt
  6. 17
      Http/src/main/java/com/arialyy/aria/http/download/HttpDOptionAdapter.kt
  7. 5
      Http/src/main/java/com/arialyy/aria/http/download/SingleTaskDelegate.kt
  8. 43
      Http/src/main/java/com/arialyy/aria/http/download/SubTaskDelegate.kt
  9. 2
      Http/src/main/java/com/arialyy/aria/http/download/TimerInterceptor.kt
  10. 162
      Http/src/main/java/com/arialyy/aria/http/upload/HttpUTaskAdapter.kt
  11. 2
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGEventListener.kt
  12. 1
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGOptionAdapter.kt
  13. 6
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGStartController.kt
  14. 38
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGSubTaskInterceptor.kt
  15. 20
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGTaskManager.kt
  16. 28
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupAdapter.kt
  17. 37
      HttpGroup/src/main/java/com/arialyy/dua/group/HttpDGroupTask.java
  18. 2
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/BaseEntity.kt
  19. 9
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java
  20. 2
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskManager.kt
  21. 1
      PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskAdapter.kt
  22. 44
      PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt
  23. 4
      PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTaskManager2.kt
  24. 9
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DEntity.kt
  25. 10
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGEntity.kt

@ -17,6 +17,7 @@ package com.arialyy.aria.http
import com.arialyy.aria.core.common.TaskOption
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.orm.entity.BlockRecord
/**
* @Author laoyuyu

@ -0,0 +1,25 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.http
/**
* @Author laoyuyu
* @Description
* @Date 21:14 PM 2023/3/13
**/
data class SubState(
val subId: Int
)

@ -33,10 +33,13 @@ import com.arialyy.aria.orm.entity.BlockRecord
**/
class HttpBlockThreadInterceptor : ITaskInterceptor {
private lateinit var blockManager: IBlockManager
private lateinit var taskOption: HttpDOptionAdapter
override suspend fun interceptor(chain: TaskChain): TaskResp {
blockManager = chain.blockManager as IBlockManager
val unfinishedBlockList = blockManager.unfinishedBlockList
blockManager = chain.blockManager
taskOption = chain.getTask().getTaskOption(HttpTaskOption::class.java)
.getOptionAdapter(HttpDOptionAdapter::class.java)
val unfinishedBlockList = taskOption.getUnfinishedBlockList()
if (unfinishedBlockList.isEmpty()) {
return TaskResp(TaskResp.CODE_INTERRUPT)
}
@ -47,8 +50,8 @@ class HttpBlockThreadInterceptor : ITaskInterceptor {
private fun createThreadTask(blockRecordList: List<BlockRecord>, chain: TaskChain) {
val threadTaskList = mutableListOf<IThreadTask>()
val option = chain.getTask().getTaskOption(HttpTaskOption::class.java)
blockRecordList.forEach {
val option = chain.getTask().getTaskOption(HttpTaskOption::class.java)
val threadConfig = ThreadConfig(it, option, DuaContext.getDConfig().maxSpeed)
threadTaskList.add(
ThreadTask2(
@ -62,6 +65,7 @@ class HttpBlockThreadInterceptor : ITaskInterceptor {
)
)
}
blockManager.start(threadTaskList)
option.getOptionAdapter(HttpDOptionAdapter::class.java).threadList = threadTaskList
blockManager.start()
}
}

@ -41,14 +41,14 @@ import java.io.File
*/
internal class HttpDBlockInterceptor : ITaskInterceptor {
private lateinit var task: ITask
private lateinit var option: HttpTaskOption
private lateinit var taskOption: HttpTaskOption
private lateinit var blockManager: IBlockManager
private lateinit var taskRecord: TaskRecord
override suspend fun interceptor(chain: TaskChain): TaskResp {
task = chain.getTask()
blockManager = chain.blockManager as IBlockManager
option = task.getTaskOption(HttpTaskOption::class.java)
blockManager = chain.blockManager
taskOption = task.getTaskOption(HttpTaskOption::class.java)
if (task.taskState.fileSize < 0) {
Timber.e("file size < 0")
return TaskResp(TaskResp.CODE_INTERRUPT)
@ -123,7 +123,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor {
* if block already exist, upload progress
*/
private suspend fun checkBlock(): Int {
val handler = blockManager.getHandler()
val handler = blockManager.handler
val needUpdateBlockRecord = mutableSetOf<BlockRecord>()
for (br in taskRecord.blockList) {
val blockF = File(br.blockPath)
@ -136,7 +136,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor {
if (br.curProgress != blockF.length()) {
br.curProgress = blockF.length()
needUpdateBlockRecord.add(br)
blockManager.putUnfinishedBlock(br)
taskOption.getOptionAdapter(HttpDOptionAdapter::class.java).putUnfinishedBlock(br)
}
// update task progress
handler.obtainMessage(ITaskManager.STATE_UPDATE_PROGRESS, br.curProgress)

@ -0,0 +1,57 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.http.download
import com.arialyy.aria.core.task.DBlockManager
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.http.HttpTaskOption
import kotlinx.coroutines.launch
/**
* @Author laoyuyu
* @Description
* @Date 21:45 2023/3/13
**/
class HttpDBlockManager(task: ITask) : DBlockManager(task) {
private val optionAdapter =
task.getTaskOption(HttpTaskOption::class.java).getOptionAdapter(HttpDOptionAdapter::class.java)
override fun start() {
optionAdapter.threadList.forEach { tt ->
scope.launch(dispatcher) {
tt.run()
}
}
}
override fun stop() {
optionAdapter.threadList.forEach {
it.stop()
}
}
override fun cancel() {
optionAdapter.threadList.forEach {
it.cancel()
}
}
override fun quitLooper() {
super.quitLooper()
optionAdapter.threadList.clear()
}
}

@ -16,7 +16,9 @@
package com.arialyy.aria.http.download
import com.arialyy.aria.core.processor.IHttpFileLenAdapter
import com.arialyy.aria.core.task.IThreadTask
import com.arialyy.aria.http.IHttpTaskOptionAdapter
import com.arialyy.aria.orm.entity.BlockRecord
/**
* @Author laoyuyu
@ -41,4 +43,19 @@ class HttpDOptionAdapter : IHttpTaskOptionAdapter {
var fileName: String? = null
private val unfinishedBlock = mutableListOf<BlockRecord>()
var threadList = mutableListOf<IThreadTask>()
set(value) {
field.clear()
field.addAll(value)
}
fun putUnfinishedBlock(record: BlockRecord) {
unfinishedBlock.add(record)
}
fun getUnfinishedBlockList(): List<BlockRecord> {
return unfinishedBlock
}
}

@ -18,7 +18,6 @@ package com.arialyy.aria.http.download
import android.os.Looper
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.task.DBlockManager
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.core.task.ThreadTaskManager2
import com.arialyy.aria.exception.AriaException
@ -33,10 +32,10 @@ import timber.log.Timber
* @Date 11:32 2023/3/12
**/
internal class SingleTaskDelegate(val adapter: HttpDTaskAdapter) : ITaskAdapterDelegate {
private val blockManager = DBlockManager(adapter.getTask())
private val blockManager = HttpDBlockManager(adapter.getTask())
init {
ThreadTaskManager2.putThreadManager(adapter.getTask().taskId, blockManager)
ThreadTaskManager2.putTaskManager(adapter.getTask().taskId, blockManager)
}
override fun isRunning(): Boolean {

@ -15,7 +15,17 @@
*/
package com.arialyy.aria.http.download
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.core.task.ThreadTaskManager2
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.SubState
import com.arialyy.aria.orm.entity.DEntity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
/**
* @Author laoyuyu
@ -26,19 +36,44 @@ internal class SubTaskDelegate(val adapter: HttpDTaskAdapter) : ITaskAdapterDele
private lateinit var blockManager: IBlockManager
override fun isRunning(): Boolean {
TODO("Not yet implemented")
return ThreadTaskManager2.taskIsRunning(adapter.getTask().taskId)
}
override fun cancel() {
TODO("Not yet implemented")
sendMsg(ITaskManager.STATE_CANCEL)
}
override fun stop() {
TODO("Not yet implemented")
sendMsg(ITaskManager.STATE_STOP)
}
override fun start() {
TODO("Not yet implemented")
DuaContext.duaScope.launch(Dispatchers.IO) {
adapter.addCoreInterceptor(HttpDCheckInterceptor())
adapter.addCoreInterceptor(TimerInterceptor())
adapter.addCoreInterceptor(HttpDHeaderInterceptor())
adapter.addCoreInterceptor(HttpDBlockInterceptor())
adapter.addCoreInterceptor(HttpBlockThreadInterceptor())
val resp = adapter.interceptor()
if (resp == null || resp.code != TaskResp.CODE_SUCCESS) {
adapter.getTask().getTaskOption(HttpTaskOption::class.java).eventListener.onFail(
false,
AriaException("start task fail, task interrupt, code: ${resp?.code ?: TaskResp.CODE_INTERRUPT}")
)
sendMsg(ITaskManager.STATE_STOP)
return@launch
}
}
}
/**
* @param state [ITaskManager]
*/
private fun sendMsg(state: Int) {
blockManager.handler.obtainMessage(
state,
SubState(adapter.getTask().taskState.getEntity(DEntity::class.java).did)
).sendToTarget()
}
override fun setBlockManager(blockManager: IBlockManager) {

@ -51,7 +51,7 @@ open class TimerInterceptor : ITaskInterceptor {
closeTimer()
try {
mTimer = ScheduledThreadPoolExecutor(1)
val threadManager = ThreadTaskManager2.getThreadManager(chain.getTask().taskId)
val threadManager = ThreadTaskManager2.getTaskManager(chain.getTask().taskId)
if (threadManager == null) {
Timber.e("thread manager is null, start timer fail")
return false

@ -1,81 +1,81 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.http.upload
import android.os.Looper
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.task.AbsTaskAdapter
import com.arialyy.aria.core.task.DBlockManager
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.download.HttpBlockThreadInterceptor
import com.arialyy.aria.http.download.TimerInterceptor
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
/**
* @Author laoyuyu
* @Description
* @Date 9:21 PM 2023/2/21
**/
class HttpUTaskAdapter : AbsTaskAdapter() {
private var blockManager: DBlockManager? = null
override fun getBlockManager(): ITaskManager {
if (blockManager == null) {
blockManager = DBlockManager(getTask())
}
return blockManager!!
}
override fun isRunning(): Boolean {
return blockManager?.isRunning ?: false
}
override fun cancel() {
TODO("Not yet implemented")
}
override fun stop() {
TODO("Not yet implemented")
}
override fun start() {
getTask().getTaskOption(HttpTaskOption::class.java).taskInterceptor.let {
if (it.isNotEmpty()) {
addInterceptors(it)
}
}
DuaContext.duaScope.launch(Dispatchers.IO) {
Looper.prepare()
addCoreInterceptor(TimerInterceptor())
addCoreInterceptor(HttpUBlockInterceptor())
addCoreInterceptor(HttpBlockThreadInterceptor())
val resp = interceptor()
if (resp == null || resp.code != TaskResp.CODE_SUCCESS) {
getTask().getTaskOption(HttpTaskOption::class.java).eventListener.onFail(
false,
AriaException("start task fail, task interrupt, code: ${resp?.code ?: TaskResp.CODE_INTERRUPT}")
)
blockManager?.stop()
return@launch
}
Looper.loop()
}
}
}
///*
// * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package com.arialyy.aria.http.upload
//
//import android.os.Looper
//import com.arialyy.aria.core.DuaContext
//import com.arialyy.aria.core.inf.ITaskManager
//import com.arialyy.aria.core.task.AbsTaskAdapter
//import com.arialyy.aria.core.task.DBlockManager
//import com.arialyy.aria.core.task.TaskResp
//import com.arialyy.aria.exception.AriaException
//import com.arialyy.aria.http.HttpTaskOption
//import com.arialyy.aria.http.download.HttpBlockThreadInterceptor
//import com.arialyy.aria.http.download.TimerInterceptor
//import kotlinx.coroutines.Dispatchers
//import kotlinx.coroutines.launch
//
///**
// * @Author laoyuyu
// * @Description
// * @Date 9:21 PM 2023/2/21
// **/
//class HttpUTaskAdapter : AbsTaskAdapter() {
// private var blockManager: DBlockManager? = null
//
// override fun getBlockManager(): ITaskManager {
// if (blockManager == null) {
// blockManager = DBlockManager(getTask())
// }
// return blockManager!!
// }
//
// override fun isRunning(): Boolean {
// return blockManager?.isRunning ?: false
// }
//
// override fun cancel() {
// TODO("Not yet implemented")
// }
//
// override fun stop() {
// TODO("Not yet implemented")
// }
//
// override fun start() {
// getTask().getTaskOption(HttpTaskOption::class.java).taskInterceptor.let {
// if (it.isNotEmpty()) {
// addInterceptors(it)
// }
// }
// DuaContext.duaScope.launch(Dispatchers.IO) {
// Looper.prepare()
// addCoreInterceptor(TimerInterceptor())
// addCoreInterceptor(HttpUBlockInterceptor())
// addCoreInterceptor(HttpBlockThreadInterceptor())
// val resp = interceptor()
// if (resp == null || resp.code != TaskResp.CODE_SUCCESS) {
// getTask().getTaskOption(HttpTaskOption::class.java).eventListener.onFail(
// false,
// AriaException("start task fail, task interrupt, code: ${resp?.code ?: TaskResp.CODE_INTERRUPT}")
// )
// blockManager?.stop()
// return@launch
// }
// Looper.loop()
// }
// }
//}

@ -22,7 +22,7 @@ import com.arialyy.aria.core.listener.AbsEventListener
* @Description
* @Date 9:15 PM 2023/3/6
**/
internal class HttpDGEventListener(task: HttpGroupTask) : AbsEventListener(task) {
internal class HttpDGEventListener(task: HttpDGroupTask) : AbsEventListener(task) {
override fun onComplete() {
TODO("Not yet implemented")
}

@ -25,4 +25,5 @@ import com.arialyy.aria.http.IHttpTaskOptionAdapter
internal class HttpDGOptionAdapter : IHttpTaskOptionAdapter {
val subUrlList = mutableSetOf<String>()
val subNameList = mutableListOf<String>()
}

@ -67,15 +67,15 @@ class HttpDGStartController(target: Any, val savePath: Uri) : HttpBaseStartContr
return this
}
private fun getTask(createNewTask: Boolean = true): HttpGroupTask {
private fun getTask(createNewTask: Boolean = true): HttpDGroupTask {
if (HttpUtil.checkHttpDParams(httpTaskOption)) {
throw IllegalArgumentException("invalid params")
}
val temp = TaskCachePool.getTaskByKey(savePath.toString())
if (temp != null) {
return temp as HttpGroupTask
return temp as HttpDGroupTask
}
val task = HttpGroupTask(httpTaskOption)
val task = HttpDGroupTask(httpTaskOption)
task.adapter = HttpDGroupAdapter()
TaskCachePool.putTask(task)
return task

@ -16,14 +16,19 @@
package com.arialyy.dua.group
import com.arialyy.aria.core.common.TaskOption
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.SingleDownloadTask
import com.arialyy.aria.core.task.TaskChain
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.SubState
import com.arialyy.aria.http.download.HttpDTaskAdapter
import com.arialyy.aria.orm.entity.DEntity
import com.arialyy.aria.orm.entity.DGEntity
import timber.log.Timber
/**
* Subtasks do not support chunking
@ -33,27 +38,32 @@ import com.arialyy.aria.orm.entity.DGEntity
**/
internal class HttpDGSubTaskInterceptor : ITaskInterceptor {
private lateinit var task: ITask
private lateinit var optionAdapter: HttpDGOptionAdapter
private lateinit var taskOption: HttpTaskOption
private lateinit var blockManager: IBlockManager
override suspend fun interceptor(chain: TaskChain): TaskResp {
task = chain.getTask()
blockManager = chain.blockManager
taskOption = task.getTaskOption(HttpTaskOption::class.java)
optionAdapter = taskOption.getOptionAdapter(HttpDGOptionAdapter::class.java)
val subList = createSubTask(chain)
createSubTask(chain)
blockManager.start()
return TaskResp(TaskResp.CODE_SUCCESS)
}
private fun startSubTask(subList: List<SingleDownloadTask>) {
subList.forEach {
/**
* 1. file exist
* 2. correct file length
* 3. send complete msg
*/
private fun checkTaskIsComplete(entity: DEntity): Boolean {
if (entity.fileIsComplete()) {
blockManager.handler.obtainMessage(ITaskManager.STATE_COMPLETE, SubState(entity.did))
return true
}
return false
}
private fun createSubTask(chain: TaskChain): List<SingleDownloadTask> {
val subTaskList = mutableListOf<SingleDownloadTask>()
private fun createSubTask(chain: TaskChain) {
task.taskState.getEntity(DGEntity::class.java).subList.forEach {
val tp = TaskOption()
tp.sourUrl = it.sourceUrl
@ -63,11 +73,13 @@ internal class HttpDGSubTaskInterceptor : ITaskInterceptor {
val subTask = SingleDownloadTask(tp)
val subAdapter = HttpDTaskAdapter(true)
subAdapter.setBlockManager(HttpSubBlockManager(chain.blockManager.handler))
subTask.adapter = subAdapter
subTaskList.add(subTask)
if (it.isComplete && checkTaskIsComplete(it)) {
(chain.getTask() as HttpDGroupTask).addIncompleteTaskList(subTask)
Timber.d("task already complete, sourUrl: ${it.sourceUrl}")
}
(chain.getTask() as HttpDGroupTask).addSubTask(subTask)
}
return subTaskList
}
}

@ -17,15 +17,15 @@ package com.arialyy.dua.group
import android.os.Handler
import android.os.Looper
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.http.HttpTaskOption
/**
* @Author laoyuyu
* @Description
* @Date 7:43 PM 2023/3/7
**/
internal class HttpDGTaskManager : ITaskManager {
internal class HttpDGTaskManager(val task: HttpDGroupTask) : ITaskManager, IBlockManager {
private lateinit var looper: Looper
private lateinit var handler: Handler
@ -47,10 +47,6 @@ internal class HttpDGTaskManager : ITaskManager {
false
}
fun start(taskOption: HttpTaskOption) {
taskOption.getOptionAdapter(HttpDGOptionAdapter::class.java).subUrlList
}
override fun setLooper() {
if (Looper.myLooper() == Looper.getMainLooper()) {
throw IllegalThreadStateException("io operations cannot be in the main thread")
@ -59,6 +55,12 @@ internal class HttpDGTaskManager : ITaskManager {
handler = Handler(looper, callback)
}
override fun start() {
task.incompleteTaskList.forEach {
}
}
override fun stop() {
}
@ -86,10 +88,12 @@ internal class HttpDGTaskManager : ITaskManager {
TODO("Not yet implemented")
}
fun getHandler(): Handler {
return handler
override fun setBlockNum(blockNum: Int) {
TODO("Not yet implemented")
}
override fun getHandler(): Handler = handler
override fun hasFailedTask(): Boolean {
TODO("Not yet implemented")
}

@ -18,9 +18,9 @@ package com.arialyy.dua.group
import android.os.Looper
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.task.AbsTaskAdapter
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.core.task.ThreadTaskManager2
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.http.HttpTaskOption
import com.arialyy.aria.http.download.TimerInterceptor
@ -35,12 +35,14 @@ import timber.log.Timber
**/
internal class HttpDGroupAdapter : AbsTaskAdapter() {
private val taskManager by lazy {
HttpDGTaskManager()
val manager = HttpDGTaskManager(getTask())
ThreadTaskManager2.putTaskManager(getTask().taskId, manager)
manager
}
init {
getTask().getTaskOption(HttpTaskOption::class.java).eventListener =
HttpDGEventListener(getTask() as HttpGroupTask)
HttpDGEventListener(getTask() as HttpDGroupTask)
}
override fun getBlockManager(): IBlockManager {
@ -48,20 +50,26 @@ internal class HttpDGroupAdapter : AbsTaskAdapter() {
}
override fun isRunning(): Boolean {
return taskManager.isRunning()
return ThreadTaskManager2.getTaskManager(getTask().taskId)?.isRunning() == true
}
override fun cancel() {
if (getBlockManager().isCanceled()) {
Timber.w("task already canceled, taskId: ${getTask().taskId}")
return
ThreadTaskManager2.getTaskManager(getTask().taskId)?.let {
if (it.isCanceled()) {
Timber.w("task already canceled, taskId: ${getTask().taskId}")
return
}
it.cancel()
}
}
override fun stop() {
if (getBlockManager().isStopped()) {
Timber.w("task already stopped, taskId: ${getTask().taskId}")
return
ThreadTaskManager2.getTaskManager(getTask().taskId)?.let {
if (it.isStopped()) {
Timber.w("task already stopped, taskId: ${getTask().taskId}")
return
}
it.stop()
}
}

@ -18,18 +18,51 @@ package com.arialyy.dua.group;
import com.arialyy.aria.core.common.TaskOption;
import com.arialyy.aria.core.inf.ITaskOption;
import com.arialyy.aria.core.task.AbsTask;
import com.arialyy.aria.core.task.SingleDownloadTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Created by AriaL on 2017/6/27.
* 任务组任务
*/
public class HttpGroupTask extends AbsTask {
public class HttpDGroupTask extends AbsTask {
public HttpGroupTask(ITaskOption taskOption) {
private List<SingleDownloadTask> incompleteTaskList = new ArrayList<>();
private List<SingleDownloadTask> subTaskList = new ArrayList<>();
public HttpDGroupTask(ITaskOption taskOption) {
super(taskOption);
}
void setIncompleteTaskList(List<SingleDownloadTask> list) {
incompleteTaskList.clear();
incompleteTaskList.addAll(list);
}
void addIncompleteTaskList(SingleDownloadTask task) {
incompleteTaskList.add(task);
}
List<SingleDownloadTask> getIncompleteTaskList() {
return incompleteTaskList;
}
void setSubTaskList(List<SingleDownloadTask> list) {
this.subTaskList.clear();
this.subTaskList.addAll(list);
}
void addSubTask(SingleDownloadTask task) {
this.subTaskList.add(task);
}
public List<SingleDownloadTask> getSubTaskList() {
return subTaskList;
}
@Override public int getTaskType() {
return HTTP_GROUP;
}

@ -41,4 +41,6 @@ abstract class BaseEntity : IEntity {
var ext: String? = null
val fileSize: Long = 0
var isComplete: Boolean = false
}

@ -16,23 +16,16 @@
package com.arialyy.aria.core.inf;
import android.os.Handler;
import com.arialyy.aria.core.task.IThreadTask;
import com.arialyy.aria.orm.entity.BlockRecord;
import java.util.List;
/**
* 线程任务状态
*/
public interface IBlockManager {
void start(List<IThreadTask> threadTaskList);
void start();
void setBlockNum(int blockNum);
void putUnfinishedBlock(BlockRecord record);
List<BlockRecord> getUnfinishedBlockList();
/**
* 创建handler 回调
*/

@ -16,8 +16,6 @@
package com.arialyy.aria.core.inf
import android.os.Handler
/**
* @Author laoyuyu
* @Description

@ -17,6 +17,7 @@ package com.arialyy.aria.core.task
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskAdapter
import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.core.task.ITaskInterceptor.IChain

@ -23,33 +23,29 @@ import com.arialyy.aria.core.inf.ITaskManager
import com.arialyy.aria.core.inf.ITaskOption
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.orm.entity.BlockRecord
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import timber.log.Timber
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicInteger
class DBlockManager(val task: ITask) : IBlockManager, ITaskManager {
private val unfinishedBlock = mutableListOf<BlockRecord>()
abstract class DBlockManager(val task: ITask) : IBlockManager, ITaskManager {
private val canceledNum = AtomicInteger(0) // 已经取消的线程的数
private val stoppedNum = AtomicInteger(0) // 已经停止的线程数
private val failedNum = AtomicInteger(0) // 失败的线程数
private val completedNum = AtomicInteger(0) // 完成的线程数
private val threadNum = task.getTaskOption(ITaskOption::class.java).threadNum
private val scope = MainScope()
protected val scope = MainScope()
private val threadPool = ThreadPoolExecutor(
threadNum, threadNum,
0L, MILLISECONDS,
LinkedBlockingQueue(),
)
private val dispatcher = threadPool.asCoroutineDispatcher()
private val threadTaskList = mutableListOf<IThreadTask>()
protected val dispatcher = threadPool.asCoroutineDispatcher()
private var progress: Long = 0 //当前总进度
private lateinit var looper: Looper
@ -110,20 +106,10 @@ class DBlockManager(val task: ITask) : IBlockManager, ITaskManager {
/**
* 退出looper循环
*/
private fun quitLooper() {
protected open fun quitLooper() {
looper.quit()
handler.removeCallbacksAndMessages(null)
scope.cancel()
threadTaskList.clear()
unfinishedBlock.clear()
}
override fun putUnfinishedBlock(record: BlockRecord) {
unfinishedBlock.add(record)
}
override fun getUnfinishedBlockList(): List<BlockRecord> {
return unfinishedBlock
}
override fun setLooper() {
@ -134,28 +120,6 @@ class DBlockManager(val task: ITask) : IBlockManager, ITaskManager {
handler = Handler(looper, callback)
}
override fun start(threadTaskList: List<IThreadTask>) {
this.threadTaskList.clear()
this.threadTaskList.addAll(threadTaskList)
threadTaskList.forEach { tt ->
scope.launch(dispatcher) {
tt.run()
}
}
}
override fun stop() {
threadTaskList.forEach {
it.stop()
}
}
override fun cancel() {
threadTaskList.forEach {
it.cancel()
}
}
override fun setBlockNum(blockNum: Int) {
this.blockNum = blockNum
}

@ -22,9 +22,9 @@ import java.util.concurrent.ConcurrentHashMap
object ThreadTaskManager2 {
private val taskManagerMap: ConcurrentHashMap<Int, ITaskManager> = ConcurrentHashMap()
fun getThreadManager(taskId: Int) = taskManagerMap[taskId]
fun getTaskManager(taskId: Int) = taskManagerMap[taskId]
fun putThreadManager(taskId: Int, taskManager: ITaskManager) {
fun putTaskManager(taskId: Int, taskManager: ITaskManager) {
taskManagerMap[taskId] = taskManager
}

@ -56,6 +56,15 @@ data class DEntity(
) : BaseEntity() {
private var dirFile: File? = null
/**
* 1. file exist
* 2. correct file length
*/
fun fileIsComplete(): Boolean {
val f = getFilePath()
return f.exists() && f.length() == fileSize
}
fun getFilePath(): File {
if (dirFile == null) {
dirFile = File(FileUri.getPathByUri(savePath)!!)

@ -21,7 +21,10 @@ import androidx.room.Ignore
import androidx.room.Index
import androidx.room.PrimaryKey
import androidx.room.TypeConverters
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.BaseEntity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.parcelize.Parcelize
/**
@ -55,4 +58,11 @@ data class DGEntity(
) : BaseEntity() {
@Ignore
var subList: MutableList<DEntity> = mutableListOf()
override fun update() {
updateTime = System.currentTimeMillis()
DuaContext.duaScope.launch(Dispatchers.IO) {
DuaContext.getServiceManager().getDbService().getDuaDb().getDGEntityDao()
.update(this@DGEntity)
}
}
}
Loading…
Cancel
Save