diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt index 71e105b1..8f2e9f09 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt @@ -18,8 +18,8 @@ package com.arialyy.aria.http.download import com.arialyy.aria.core.task.ITaskInterceptor import com.arialyy.aria.core.task.TaskChain import com.arialyy.aria.core.task.TaskResp +import com.arialyy.aria.core.task.ThreadTask import com.arialyy.aria.orm.entity.BlockRecord -import java.util.concurrent.BlockingQueue /** * @Author laoyuyu @@ -29,15 +29,14 @@ import java.util.concurrent.BlockingQueue class HttpBlockThreadInterceptor : ITaskInterceptor { override suspend fun interceptor(chain: TaskChain): TaskResp { - val queue = chain.blockManager.blockQueue - if (queue.isEmpty()) { + val unfinishedBlockList = chain.blockManager.unfinishedBlockList + if (unfinishedBlockList.isEmpty()) { return TaskResp(TaskResp.CODE_BLOCK_QUEUE_NULL) } - cycleQueue(queue) + createThreadTask(unfinishedBlockList) } - private fun cycleQueue(queue: BlockingQueue){ - queue.take() - + private fun createThreadTask(blockRecordList: List) { + blockRecordList.forEach(ThreadTask()) } } \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java index 7e717d6d..338173fc 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java @@ -16,9 +16,9 @@ package com.arialyy.aria.core.inf; import android.os.Handler; -import android.os.Looper; -import androidx.annotation.NonNull; +import com.arialyy.aria.core.task.ThreadTask; import com.arialyy.aria.orm.entity.BlockRecord; +import java.util.List; import kotlinx.coroutines.channels.Channel; /** @@ -39,14 +39,14 @@ public interface IBlockManager { String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION"; String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度 - Channel getChannel(); - - void setLopper(@NonNull Looper looper); + void start(List threadTaskList); void setBlockNum(int blockNum); void putUnfinishedBlock(BlockRecord record); + List getUnfinishedBlockList(); + /** * 是否有失败的快 * @@ -68,13 +68,6 @@ public interface IBlockManager { */ long getCurrentProgress(); - /** - * 更新当前进度 - * - * @param currentProgress 当前进度 - */ - void updateCurrentProgress(long currentProgress); - boolean isStopped(); boolean isCanceled(); diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt index d85781cf..4caf404b 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt @@ -24,8 +24,9 @@ import com.arialyy.aria.exception.AriaException import com.arialyy.aria.orm.entity.BlockRecord import kotlinx.coroutines.MainScope import kotlinx.coroutines.asCoroutineDispatcher -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.cancel import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import timber.log.Timber import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor @@ -33,12 +34,11 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicInteger class BlockManager(task: ITask) : IBlockManager { - private val unfinishedBlockList = mutableListOf() + private val unfinishedBlock = mutableListOf() private val canceledNum = AtomicInteger(0) // 已经取消的线程的数 private val stoppedNum = AtomicInteger(0) // 已经停止的线程数 private val failedNum = AtomicInteger(0) // 失败的线程数 private val completedNum = AtomicInteger(0) // 完成的线程数 - private val channel = Channel() private val threadNum = task.getTaskOption(ITaskOption::class.java).threadNum private val scope = MainScope() private val dispatcher = ThreadPoolExecutor( @@ -105,18 +105,27 @@ class BlockManager(task: ITask) : IBlockManager { */ private fun quitLooper() { looper.quit() + scope.cancel() } override fun putUnfinishedBlock(record: BlockRecord) { - unfinishedBlockList.add(record) + unfinishedBlock.add(record) } - override fun getChannel(): Channel { - return channel + override fun getUnfinishedBlockList(): MutableList { + return unfinishedBlock } - override fun setLopper(looper: Looper) { - this.looper = looper + override fun start(threadTaskList: List) { + if (Looper.myLooper() == Looper.getMainLooper()) { + throw IllegalThreadStateException("io operations cannot be in the main thread") + } + looper = Looper.myLooper()!! + threadTaskList.forEach { tt -> + scope.launch(dispatcher) { + tt.run() + } + } } override fun setBlockNum(blockNum: Int) { @@ -137,10 +146,6 @@ class BlockManager(task: ITask) : IBlockManager { return progress } - override fun updateCurrentProgress(currentProgress: Long) { - progress = currentProgress - } - /** * 所有子线程是否都已经停止 */ diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java index 368db5f3..406df0c8 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java @@ -21,7 +21,7 @@ import java.util.concurrent.Callable; * @author lyy * Date: 2019-09-18 */ -public interface IThreadTask extends Callable { +public interface IThreadTask extends Runnable { /** * 销毁任务