diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt index 8f2e9f09..6f2f99e3 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt @@ -16,6 +16,7 @@ package com.arialyy.aria.http.download import com.arialyy.aria.core.task.ITaskInterceptor +import com.arialyy.aria.core.task.IThreadTask import com.arialyy.aria.core.task.TaskChain import com.arialyy.aria.core.task.TaskResp import com.arialyy.aria.core.task.ThreadTask @@ -37,6 +38,9 @@ class HttpBlockThreadInterceptor : ITaskInterceptor { } private fun createThreadTask(blockRecordList: List) { - blockRecordList.forEach(ThreadTask()) + val threadTaskList = mutableListOf() + blockRecordList.forEach{ + + } } } \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java index 338173fc..1194f848 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java @@ -16,6 +16,7 @@ package com.arialyy.aria.core.inf; import android.os.Handler; +import com.arialyy.aria.core.task.IThreadTask; import com.arialyy.aria.core.task.ThreadTask; import com.arialyy.aria.orm.entity.BlockRecord; import java.util.List; @@ -39,7 +40,7 @@ public interface IBlockManager { String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION"; String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度 - void start(List threadTaskList); + void start(List threadTaskList); void setBlockNum(int blockNum); diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt index 4caf404b..ddba3b00 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt @@ -94,7 +94,7 @@ class BlockManager(task: ITask) : IBlockManager { } } IBlockManager.STATE_UPDATE_PROGRESS -> { - progress = msg.obj as Long + progress += msg.obj as Long } } false @@ -116,7 +116,7 @@ class BlockManager(task: ITask) : IBlockManager { return unfinishedBlock } - override fun start(threadTaskList: List) { + override fun start(threadTaskList: List) { if (Looper.myLooper() == Looper.getMainLooper()) { throw IllegalThreadStateException("io operations cannot be in the main thread") } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java index 406df0c8..183a35e7 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java @@ -15,38 +15,12 @@ */ package com.arialyy.aria.core.task; -import java.util.concurrent.Callable; - /** * @author lyy * Date: 2019-09-18 */ public interface IThreadTask extends Runnable { - /** - * 销毁任务 - */ - void destroy(); - - /** - * 判断线程任务是否销毁 - * - * @return true 已经销毁 - */ - boolean isDestroy(); - - /** - * 中断任务 - */ - void breakTask(); - - /** - * 当前线程是否完成,对于不支持断点的任务,一律未完成 - * - * @return {@code true} 完成;{@code false} 未完成 - */ - boolean isThreadComplete(); - /** * 取消任务 */ @@ -69,32 +43,9 @@ public interface IThreadTask extends Runnable { * * @return {@code true}存活 */ - boolean isLive(); + boolean isRunning(); - /** - * 任务是否中断,中断条件: - * 1、任务取消 - * 2、任务停止 - * 3、手动中断 - * - * @return {@code true} 中断,{@code false} 不是中断 - */ - boolean isBreak(); + void onFail(Exception e); - /** - * 检查下载完成的分块大小,如果下载完成的分块大小大于或小于分配的大小,则需要重新下载该分块 如果是非分块任务,直接返回{@code true} - * - * @return {@code true} 分块分大小正常,{@code false} 分块大小错误 - */ - boolean checkBlock(); - - /** - * 获取线程id - */ - int getThreadId(); - - /** - * 线程名字,命名规则:md5(任务地址 + 线程id) - */ - String getThreadName(); + void onComplete(); } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java index d325f22e..bfb625a4 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java @@ -26,7 +26,17 @@ public interface IThreadTaskAdapter { /** * 执行任务 */ - void call(IThreadTask threadTask) throws Exception; + void run(IThreadTask threadTask); + + /** + * 取消任务 + */ + void cancel(); + + /** + * 停止任务 + */ + void stop(); /** * 设置当前线程最大下载速度 diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java index 381607f3..feef4c3d 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java @@ -15,10 +15,6 @@ */ package com.arialyy.aria.core.task; -import android.os.Bundle; -import com.arialyy.aria.core.inf.IBlockManager; -import com.arialyy.aria.exception.AriaException; - /** * 线程任务观察者 * @@ -27,25 +23,6 @@ import com.arialyy.aria.exception.AriaException; */ public interface IThreadTaskObserver { - /** - * 更新所有状态 - * - * @param state state {@link IBlockManager#STATE_STOP}.. - */ - void updateState(int state, Bundle bundle); - - /** - * 更新完成的状态 - */ - void updateCompleteState(); - - /** - * 更新失败的状态 - * - * @param needRetry 是否需要重试,一般是网络错误才需要重试 - */ - void updateFailState(AriaException e, boolean needRetry); - /** * 更新进度 * diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java index 129f10b9..0c771fc6 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java @@ -546,11 +546,10 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver { } } - @Override public ThreadTask call() throws Exception { + @Override public void run() { isDestroy = false; Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); TrafficStats.setThreadStatsTag(UUID.randomUUID().toString().hashCode()); - mAdapter.call(this); - return this; + mAdapter.run(this); } } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt new file mode 100644 index 00000000..c51fd122 --- /dev/null +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.core.task + +import android.os.Bundle +import android.os.Handler +import com.arialyy.aria.core.DuaContext +import com.arialyy.aria.core.inf.IBlockManager +import com.arialyy.aria.orm.entity.BlockRecord +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import timber.log.Timber +import java.io.File + +/** + * @Author laoyuyu + * @Description + * @Date 19:40 PM 2023/2/7 + **/ +class ThreadTask2( + private val adapter: IThreadTaskAdapter, + private val handler: Handler, + private val record: BlockRecord +) : IThreadTask, IThreadTaskObserver { + private var isCanceled = false + private var isStopped = false + private var failCount = 3 + private var lastUpdateTime = System.currentTimeMillis() + + companion object { + private const val MAX_RE_TRY_NUM = 3 + private const val RE_TRY_TIME = 1000 * 3L + } + + override fun run() { + adapter.run(this) + } + + override fun cancel() { + adapter.cancel() + isCanceled = true + handler.obtainMessage(IBlockManager.STATE_CANCEL) + } + + override fun stop() { + adapter.stop() + isStopped = true + handler.obtainMessage(IBlockManager.STATE_STOP) + } + + override fun setMaxSpeed(speed: Int) { + adapter.setMaxSpeed(speed) + } + + override fun isRunning(): Boolean { + return !isCanceled && !isStopped + } + + /** + * thread task fail, we need count fail num + * if [failCount] less than [MAX_RE_TRY_NUM], will retry the thread task + */ + override fun onFail(e: Exception?) { + Timber.e("execute thread fail, failNum: $failCount blockId: ${record.bId}, blockPath: ${record.blockPath}, sourceUrl: ${record.sourUrl}") + if (failCount < MAX_RE_TRY_NUM) { + Timber.e("retry thread, failCount: $failCount") + DuaContext.duaScope.launch(Dispatchers.IO) { + delay(RE_TRY_TIME) + } + run() + return + } + val b = Bundle() + b.putBoolean(IBlockManager.DATA_RETRY, false) + b.putSerializable(IBlockManager.DATA_ERROR_INFO, e) + handler.obtainMessage(IBlockManager.STATE_FAIL, b) + } + + override fun onComplete() { + val blockF = File(record.blockPath) + if (blockF.length() != record.curProgress) { + Timber.e("task fail, blockSize: ${blockF.length()} Not equal to curProgress: ${record.curProgress}") + onFail(null) + return + } + record.isComplete = true + // update progress once a second, we need to check the progress difference. + val diff = kotlin.math.abs(record.curProgress - blockF.length()) + if (diff != 0L) { + handler.obtainMessage(IBlockManager.STATE_RUNNING, diff) + } + updateRecord() + handler.obtainMessage(IBlockManager.STATE_COMPLETE) + } + + private fun updateRecord() { + val dao = DuaContext.getServiceManager().getDbService().getDuaDb().getRecordDao() + DuaContext.duaScope.launch(Dispatchers.IO) { + dao.updateBlockRecord(record) + } + } + + /** + * update current thread progress, once a second + */ + override fun updateProgress(len: Long) { + record.curProgress += len + if (System.currentTimeMillis() - lastUpdateTime > 1000) { + lastUpdateTime = System.currentTimeMillis() + handler.obtainMessage(IBlockManager.STATE_RUNNING, len) + } + } + + override fun getThreadProgress(): Long { + return record.curProgress + } +} \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/orm/dao/RecordDao.kt b/PublicComponent/src/main/java/com/arialyy/aria/orm/dao/RecordDao.kt index 49edd022..834fe58d 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/orm/dao/RecordDao.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/orm/dao/RecordDao.kt @@ -57,6 +57,9 @@ interface RecordDao { @Update suspend fun updateBlockList(blockList: List) + @Update + suspend fun updateBlockRecord(record: BlockRecord) + @Delete suspend fun deleteTaskRecord(record: TaskRecord) diff --git a/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/BlockRecord.kt b/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/BlockRecord.kt index ec39a7dc..58535f15 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/BlockRecord.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/orm/entity/BlockRecord.kt @@ -67,4 +67,7 @@ data class BlockRecord( @Ignore var curProgress = 0L + + @Ignore + lateinit var sourUrl: String } \ No newline at end of file