BlockManager

v4
laoyuyu 2 years ago
parent 9a2976827f
commit 0dabf70fd1
  1. 43
      Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt
  2. 1
      Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt
  3. 4
      Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt
  4. 1
      Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt
  5. 4
      Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt
  6. 7
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java
  7. 1
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/ITaskOption.java
  8. 3
      PublicComponent/src/main/java/com/arialyy/aria/core/manager/ThreadTaskManager.java
  9. 33
      PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt
  10. 1
      PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskResp.kt

@ -0,0 +1,43 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.http.download
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.TaskChain
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.orm.entity.BlockRecord
import java.util.concurrent.BlockingQueue
/**
* @Author laoyuyu
* @Description
* @Date 7:11 PM 2023/2/7
**/
class HttpBlockThreadInterceptor : ITaskInterceptor {
override suspend fun interceptor(chain: TaskChain): TaskResp {
val queue = chain.blockManager.blockQueue
if (queue.isEmpty()) {
return TaskResp(TaskResp.CODE_BLOCK_QUEUE_NULL)
}
cycleQueue(queue)
}
private fun cycleQueue(queue: BlockingQueue<BlockRecord>){
queue.take()
}
}

@ -134,6 +134,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor {
if (br.curProgress == blockF.length() && !br.isComplete) {
br.isComplete = true
needUpdateBlockRecord.add(br)
handler.obtainMessage(IBlockManager.STATE_COMPLETE)
}
if (br.curProgress != blockF.length()) {
br.curProgress = blockF.length()

@ -21,13 +21,13 @@ import android.os.Looper
import android.os.Process
import android.text.TextUtils
import com.arialyy.aria.core.processor.IHttpFileLenAdapter
import com.arialyy.aria.core.task.BlockState
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.TaskChain
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.http.HttpUtil
import com.arialyy.aria.http.request.IRequest
import com.arialyy.aria.orm.entity.BlockRecord
import com.arialyy.aria.util.FileUtils
import timber.log.Timber
import java.io.BufferedReader
@ -64,7 +64,7 @@ internal class HttpDHeaderInterceptor : ITaskInterceptor {
if (fileSize >= 0) {
task.taskState.isSupportResume = fileSize != 0L
task.taskState.isSupportBlock =
task.taskState.isSupportResume && fileSize > BlockState.BLOCK_SIZE
task.taskState.isSupportResume && fileSize > BlockRecord.BLOCK_SIZE
task.taskState.fileSize = fileSize
return chain.proceed(task)
}

@ -32,5 +32,4 @@ class HttpDTaskOption : DTaskOption() {
var fileSizeAdapter: IHttpFileLenAdapter? = null
var taskInterceptor = mutableListOf<ITaskInterceptor>()
var isChunkTask = false
var threadNum: Long = 1L
}

@ -18,13 +18,13 @@ internal class HttpDTaskUtil : AbsTaskUtil() {
private var blockManager: BlockManager? = null
override fun getBlockManager(): IBlockManager {
if (blockManager == null) {
blockManager = BlockManager(getTask().getTaskOption(HttpDTaskOption::class.java).taskListener)
blockManager = BlockManager(getTask())
}
return blockManager!!
}
override fun isRunning(): Boolean {
TODO("Not yet implemented")
return blockManager?.isRunning ?: false
}
override fun cancel() {

@ -19,6 +19,7 @@ import android.os.Handler;
import android.os.Looper;
import androidx.annotation.NonNull;
import com.arialyy.aria.orm.entity.BlockRecord;
import kotlinx.coroutines.channels.Channel;
/**
* 线程任务状态
@ -38,14 +39,14 @@ public interface IBlockManager {
String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION";
String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度
Channel<BlockRecord> getChannel();
void setLopper(@NonNull Looper looper);
void setBlockNum(int blockNum);
void putUnfinishedBlock(BlockRecord record);
BlockRecord getUnfinishedBlock();
/**
* 是否有失败的快
*
@ -78,6 +79,8 @@ public interface IBlockManager {
boolean isCanceled();
boolean isRunning();
/**
* 创建handler 回调
*/

@ -24,4 +24,5 @@ import com.arialyy.aria.core.listener.IEventListener;
public abstract class ITaskOption {
public IEventListener taskListener;
public int threadNum;
}

@ -34,7 +34,8 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* 线程任务管理器
*/
public class ThreadTaskManager {
@Deprecated
public class ThreadTaskManager1 {
private final String TAG = CommonUtil.getClassName(this);
private static volatile ThreadTaskManager INSTANCE = null;
private static final int CORE_POOL_NUM = 20;

@ -18,23 +18,40 @@ package com.arialyy.aria.core.task
import android.os.Handler.Callback
import android.os.Looper
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskOption
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.orm.entity.BlockRecord
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.isActive
import timber.log.Timber
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicInteger
class BlockManager(private val eventListener: IEventListener) : IBlockManager {
private val unfinishedBlockQueue = LinkedBlockingDeque<BlockRecord>()
class BlockManager(task: ITask) : IBlockManager {
private val unfinishedBlockList = mutableListOf<BlockRecord>()
private val canceledNum = AtomicInteger(0) // 已经取消的线程的数
private val stoppedNum = AtomicInteger(0) // 已经停止的线程数
private val failedNum = AtomicInteger(0) // 失败的线程数
private val completedNum = AtomicInteger(0) // 完成的线程数
private val channel = Channel<BlockRecord>()
private val threadNum = task.getTaskOption(ITaskOption::class.java).threadNum
private val scope = MainScope()
private val dispatcher = ThreadPoolExecutor(
threadNum, threadNum,
0L, MILLISECONDS,
LinkedBlockingQueue()
).asCoroutineDispatcher()
private var progress: Long = 0 //当前总进度
private lateinit var looper: Looper
private var blockNum: Int = 1
private var eventListener: IEventListener =
task.getTaskOption(ITaskOption::class.java).taskListener
private val callback = Callback { msg ->
when (msg.what) {
@ -91,11 +108,11 @@ class BlockManager(private val eventListener: IEventListener) : IBlockManager {
}
override fun putUnfinishedBlock(record: BlockRecord) {
unfinishedBlockQueue.offer(record)
unfinishedBlockList.add(record)
}
override fun getUnfinishedBlock(): BlockRecord {
return unfinishedBlockQueue.remove()
override fun getChannel(): Channel<BlockRecord> {
return channel
}
override fun setLopper(looper: Looper) {
@ -137,6 +154,10 @@ class BlockManager(private val eventListener: IEventListener) : IBlockManager {
return canceledNum.get() == blockNum
}
override fun isRunning(): Boolean {
return scope.isActive
}
override fun getHandlerCallback(): Callback {
return callback
}

@ -12,6 +12,7 @@ class TaskResp(val code: Int = CODE_DEF) {
const val CODE_DEF = 0
const val CODE_SAVE_URI_NULL = 3
const val CODE_GET_FILE_INFO_FAIL = 2
const val CODE_BLOCK_QUEUE_NULL = 4
}
var fileSize: Long = 0

Loading…
Cancel
Save