From 0dabf70fd12a0abfd3fd957ff0afa0fe24075a3f Mon Sep 17 00:00:00 2001 From: laoyuyu Date: Tue, 7 Feb 2023 20:09:59 +0800 Subject: [PATCH] BlockManager --- .../download/HttpBlockThreadInterceptor.kt | 43 +++++++++++++++++++ .../http/download/HttpDBlockInterceptor.kt | 1 + .../http/download/HttpDHeaderInterceptor.kt | 4 +- .../aria/http/download/HttpDTaskOption.kt | 1 - .../aria/http/download/HttpDTaskUtil.kt | 4 +- .../arialyy/aria/core/inf/IBlockManager.java | 7 ++- .../arialyy/aria/core/inf/ITaskOption.java | 1 + .../aria/core/manager/ThreadTaskManager.java | 3 +- .../arialyy/aria/core/task/DBlockManager.kt | 33 +++++++++++--- .../com/arialyy/aria/core/task/TaskResp.kt | 1 + 10 files changed, 84 insertions(+), 14 deletions(-) create mode 100644 Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt new file mode 100644 index 00000000..71e105b1 --- /dev/null +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.http.download + +import com.arialyy.aria.core.task.ITaskInterceptor +import com.arialyy.aria.core.task.TaskChain +import com.arialyy.aria.core.task.TaskResp +import com.arialyy.aria.orm.entity.BlockRecord +import java.util.concurrent.BlockingQueue + +/** + * @Author laoyuyu + * @Description + * @Date 7:11 PM 2023/2/7 + **/ +class HttpBlockThreadInterceptor : ITaskInterceptor { + + override suspend fun interceptor(chain: TaskChain): TaskResp { + val queue = chain.blockManager.blockQueue + if (queue.isEmpty()) { + return TaskResp(TaskResp.CODE_BLOCK_QUEUE_NULL) + } + cycleQueue(queue) + } + + private fun cycleQueue(queue: BlockingQueue){ + queue.take() + + } +} \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt index 5def4bfb..a7c0e756 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt @@ -134,6 +134,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor { if (br.curProgress == blockF.length() && !br.isComplete) { br.isComplete = true needUpdateBlockRecord.add(br) + handler.obtainMessage(IBlockManager.STATE_COMPLETE) } if (br.curProgress != blockF.length()) { br.curProgress = blockF.length() diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt index 66f9c214..4cf8350b 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt @@ -21,13 +21,13 @@ import android.os.Looper import android.os.Process import android.text.TextUtils import com.arialyy.aria.core.processor.IHttpFileLenAdapter -import com.arialyy.aria.core.task.BlockState import com.arialyy.aria.core.task.ITask import com.arialyy.aria.core.task.ITaskInterceptor import com.arialyy.aria.core.task.TaskChain import com.arialyy.aria.core.task.TaskResp import com.arialyy.aria.http.HttpUtil import com.arialyy.aria.http.request.IRequest +import com.arialyy.aria.orm.entity.BlockRecord import com.arialyy.aria.util.FileUtils import timber.log.Timber import java.io.BufferedReader @@ -64,7 +64,7 @@ internal class HttpDHeaderInterceptor : ITaskInterceptor { if (fileSize >= 0) { task.taskState.isSupportResume = fileSize != 0L task.taskState.isSupportBlock = - task.taskState.isSupportResume && fileSize > BlockState.BLOCK_SIZE + task.taskState.isSupportResume && fileSize > BlockRecord.BLOCK_SIZE task.taskState.fileSize = fileSize return chain.proceed(task) } diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt index 17da1b95..b3ccea7b 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt @@ -32,5 +32,4 @@ class HttpDTaskOption : DTaskOption() { var fileSizeAdapter: IHttpFileLenAdapter? = null var taskInterceptor = mutableListOf() var isChunkTask = false - var threadNum: Long = 1L } \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt index b6512d11..aa5a9fcd 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt @@ -18,13 +18,13 @@ internal class HttpDTaskUtil : AbsTaskUtil() { private var blockManager: BlockManager? = null override fun getBlockManager(): IBlockManager { if (blockManager == null) { - blockManager = BlockManager(getTask().getTaskOption(HttpDTaskOption::class.java).taskListener) + blockManager = BlockManager(getTask()) } return blockManager!! } override fun isRunning(): Boolean { - TODO("Not yet implemented") + return blockManager?.isRunning ?: false } override fun cancel() { diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java index 44882bb7..7e717d6d 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java @@ -19,6 +19,7 @@ import android.os.Handler; import android.os.Looper; import androidx.annotation.NonNull; import com.arialyy.aria.orm.entity.BlockRecord; +import kotlinx.coroutines.channels.Channel; /** * 线程任务状态 @@ -38,14 +39,14 @@ public interface IBlockManager { String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION"; String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度 + Channel getChannel(); + void setLopper(@NonNull Looper looper); void setBlockNum(int blockNum); void putUnfinishedBlock(BlockRecord record); - BlockRecord getUnfinishedBlock(); - /** * 是否有失败的快 * @@ -78,6 +79,8 @@ public interface IBlockManager { boolean isCanceled(); + boolean isRunning(); + /** * 创建handler 回调 */ diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskOption.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskOption.java index 4a1d78c0..cf87ffd1 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskOption.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskOption.java @@ -24,4 +24,5 @@ import com.arialyy.aria.core.listener.IEventListener; public abstract class ITaskOption { public IEventListener taskListener; + public int threadNum; } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/manager/ThreadTaskManager.java b/PublicComponent/src/main/java/com/arialyy/aria/core/manager/ThreadTaskManager.java index e4bb41d4..84e19802 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/manager/ThreadTaskManager.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/manager/ThreadTaskManager.java @@ -34,7 +34,8 @@ import java.util.concurrent.locks.ReentrantLock; /** * 线程任务管理器 */ -public class ThreadTaskManager { +@Deprecated +public class ThreadTaskManager1 { private final String TAG = CommonUtil.getClassName(this); private static volatile ThreadTaskManager INSTANCE = null; private static final int CORE_POOL_NUM = 20; diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt index 0b0a5617..d85781cf 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt @@ -18,23 +18,40 @@ package com.arialyy.aria.core.task import android.os.Handler.Callback import android.os.Looper import com.arialyy.aria.core.inf.IBlockManager +import com.arialyy.aria.core.inf.ITaskOption import com.arialyy.aria.core.listener.IEventListener import com.arialyy.aria.exception.AriaException import com.arialyy.aria.orm.entity.BlockRecord +import kotlinx.coroutines.MainScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.isActive import timber.log.Timber -import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.atomic.AtomicInteger -class BlockManager(private val eventListener: IEventListener) : IBlockManager { - private val unfinishedBlockQueue = LinkedBlockingDeque() +class BlockManager(task: ITask) : IBlockManager { + private val unfinishedBlockList = mutableListOf() private val canceledNum = AtomicInteger(0) // 已经取消的线程的数 private val stoppedNum = AtomicInteger(0) // 已经停止的线程数 private val failedNum = AtomicInteger(0) // 失败的线程数 private val completedNum = AtomicInteger(0) // 完成的线程数 + private val channel = Channel() + private val threadNum = task.getTaskOption(ITaskOption::class.java).threadNum + private val scope = MainScope() + private val dispatcher = ThreadPoolExecutor( + threadNum, threadNum, + 0L, MILLISECONDS, + LinkedBlockingQueue() + ).asCoroutineDispatcher() private var progress: Long = 0 //当前总进度 private lateinit var looper: Looper private var blockNum: Int = 1 + private var eventListener: IEventListener = + task.getTaskOption(ITaskOption::class.java).taskListener private val callback = Callback { msg -> when (msg.what) { @@ -91,11 +108,11 @@ class BlockManager(private val eventListener: IEventListener) : IBlockManager { } override fun putUnfinishedBlock(record: BlockRecord) { - unfinishedBlockQueue.offer(record) + unfinishedBlockList.add(record) } - override fun getUnfinishedBlock(): BlockRecord { - return unfinishedBlockQueue.remove() + override fun getChannel(): Channel { + return channel } override fun setLopper(looper: Looper) { @@ -137,6 +154,10 @@ class BlockManager(private val eventListener: IEventListener) : IBlockManager { return canceledNum.get() == blockNum } + override fun isRunning(): Boolean { + return scope.isActive + } + override fun getHandlerCallback(): Callback { return callback } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskResp.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskResp.kt index f104cb63..8dfb69df 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskResp.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskResp.kt @@ -12,6 +12,7 @@ class TaskResp(val code: Int = CODE_DEF) { const val CODE_DEF = 0 const val CODE_SAVE_URI_NULL = 3 const val CODE_GET_FILE_INFO_FAIL = 2 + const val CODE_BLOCK_QUEUE_NULL = 4 } var fileSize: Long = 0