BlockManager

v4
laoyuyu 2 years ago
parent 0dabf70fd1
commit 9f55307054
  1. 13
      Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt
  2. 17
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java
  3. 29
      PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt
  4. 2
      PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java

@ -18,8 +18,8 @@ package com.arialyy.aria.http.download
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.TaskChain
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.core.task.ThreadTask
import com.arialyy.aria.orm.entity.BlockRecord
import java.util.concurrent.BlockingQueue
/**
* @Author laoyuyu
@ -29,15 +29,14 @@ import java.util.concurrent.BlockingQueue
class HttpBlockThreadInterceptor : ITaskInterceptor {
override suspend fun interceptor(chain: TaskChain): TaskResp {
val queue = chain.blockManager.blockQueue
if (queue.isEmpty()) {
val unfinishedBlockList = chain.blockManager.unfinishedBlockList
if (unfinishedBlockList.isEmpty()) {
return TaskResp(TaskResp.CODE_BLOCK_QUEUE_NULL)
}
cycleQueue(queue)
createThreadTask(unfinishedBlockList)
}
private fun cycleQueue(queue: BlockingQueue<BlockRecord>){
queue.take()
private fun createThreadTask(blockRecordList: List<BlockRecord>) {
blockRecordList.forEach(ThreadTask())
}
}

@ -16,9 +16,9 @@
package com.arialyy.aria.core.inf;
import android.os.Handler;
import android.os.Looper;
import androidx.annotation.NonNull;
import com.arialyy.aria.core.task.ThreadTask;
import com.arialyy.aria.orm.entity.BlockRecord;
import java.util.List;
import kotlinx.coroutines.channels.Channel;
/**
@ -39,14 +39,14 @@ public interface IBlockManager {
String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION";
String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度
Channel<BlockRecord> getChannel();
void setLopper(@NonNull Looper looper);
void start(List<ThreadTask> threadTaskList);
void setBlockNum(int blockNum);
void putUnfinishedBlock(BlockRecord record);
List<BlockRecord> getUnfinishedBlockList();
/**
* 是否有失败的快
*
@ -68,13 +68,6 @@ public interface IBlockManager {
*/
long getCurrentProgress();
/**
* 更新当前进度
*
* @param currentProgress 当前进度
*/
void updateCurrentProgress(long currentProgress);
boolean isStopped();
boolean isCanceled();

@ -24,8 +24,9 @@ import com.arialyy.aria.exception.AriaException
import com.arialyy.aria.orm.entity.BlockRecord
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.cancel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import timber.log.Timber
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
@ -33,12 +34,11 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicInteger
class BlockManager(task: ITask) : IBlockManager {
private val unfinishedBlockList = mutableListOf<BlockRecord>()
private val unfinishedBlock = mutableListOf<BlockRecord>()
private val canceledNum = AtomicInteger(0) // 已经取消的线程的数
private val stoppedNum = AtomicInteger(0) // 已经停止的线程数
private val failedNum = AtomicInteger(0) // 失败的线程数
private val completedNum = AtomicInteger(0) // 完成的线程数
private val channel = Channel<BlockRecord>()
private val threadNum = task.getTaskOption(ITaskOption::class.java).threadNum
private val scope = MainScope()
private val dispatcher = ThreadPoolExecutor(
@ -105,18 +105,27 @@ class BlockManager(task: ITask) : IBlockManager {
*/
private fun quitLooper() {
looper.quit()
scope.cancel()
}
override fun putUnfinishedBlock(record: BlockRecord) {
unfinishedBlockList.add(record)
unfinishedBlock.add(record)
}
override fun getChannel(): Channel<BlockRecord> {
return channel
override fun getUnfinishedBlockList(): MutableList<BlockRecord> {
return unfinishedBlock
}
override fun setLopper(looper: Looper) {
this.looper = looper
override fun start(threadTaskList: List<ThreadTask>) {
if (Looper.myLooper() == Looper.getMainLooper()) {
throw IllegalThreadStateException("io operations cannot be in the main thread")
}
looper = Looper.myLooper()!!
threadTaskList.forEach { tt ->
scope.launch(dispatcher) {
tt.run()
}
}
}
override fun setBlockNum(blockNum: Int) {
@ -137,10 +146,6 @@ class BlockManager(task: ITask) : IBlockManager {
return progress
}
override fun updateCurrentProgress(currentProgress: Long) {
progress = currentProgress
}
/**
* 所有子线程是否都已经停止
*/

@ -21,7 +21,7 @@ import java.util.concurrent.Callable;
* @author lyy
* Date: 2019-09-18
*/
public interface IThreadTask extends Callable<IThreadTask> {
public interface IThreadTask extends Runnable {
/**
* 销毁任务

Loading…
Cancel
Save