From 75d22ea0288a8fd8f2c92e5f071d526782365827 Mon Sep 17 00:00:00 2001 From: laoyuyu Date: Wed, 8 Feb 2023 20:22:25 +0800 Subject: [PATCH] thread task --- .../download/HttpBlockThreadInterceptor.kt | 22 ++++- .../http/download/HttpDBlockInterceptor.kt | 5 +- .../aria/http/download/HttpDCTTaskAdapter.kt | 78 +++++++++++++++ .../http/download/HttpDHeaderInterceptor.kt | 5 +- .../http/download/HttpDStartController.kt | 2 +- .../aria/http/download/HttpDTTaskAdapter.kt | 96 +++++++++++++++++++ .../aria/http/download/HttpDTaskOption.kt | 13 ++- .../aria/http/download/HttpDTaskUtil.kt | 18 +++- .../http/download/HttpDThreadTaskAdapter.java | 82 ++++++---------- .../java/com/arialyy/aria/core/DuaContext.kt | 5 +- .../arialyy/aria/core/inf/IBlockManager.java | 6 +- .../aria/core/task/AbsThreadTaskAdapter.java | 84 +++++++--------- .../arialyy/aria/core/task/DBlockManager.kt | 13 ++- .../arialyy/aria/core/task/IThreadTask.java | 3 - .../aria/core/task/IThreadTaskAdapter.java | 13 +-- .../aria/core/task/IThreadTaskObserver.java | 6 +- .../com/arialyy/aria/core/task/TaskState.kt | 11 --- .../arialyy/aria/core/task/ThreadConfig.kt | 26 +++++ .../arialyy/aria/core/task/ThreadTask.java | 2 +- .../com/arialyy/aria/core/task/ThreadTask2.kt | 4 +- 20 files changed, 343 insertions(+), 151 deletions(-) create mode 100644 Http/src/main/java/com/arialyy/aria/http/download/HttpDCTTaskAdapter.kt create mode 100644 Http/src/main/java/com/arialyy/aria/http/download/HttpDTTaskAdapter.kt create mode 100644 PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadConfig.kt diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt index 6f2f99e3..ef821e19 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpBlockThreadInterceptor.kt @@ -15,11 +15,13 @@ */ package com.arialyy.aria.http.download +import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.task.ITaskInterceptor import com.arialyy.aria.core.task.IThreadTask import com.arialyy.aria.core.task.TaskChain import com.arialyy.aria.core.task.TaskResp -import com.arialyy.aria.core.task.ThreadTask +import com.arialyy.aria.core.task.ThreadConfig +import com.arialyy.aria.core.task.ThreadTask2 import com.arialyy.aria.orm.entity.BlockRecord /** @@ -34,13 +36,25 @@ class HttpBlockThreadInterceptor : ITaskInterceptor { if (unfinishedBlockList.isEmpty()) { return TaskResp(TaskResp.CODE_BLOCK_QUEUE_NULL) } - createThreadTask(unfinishedBlockList) + createThreadTask(unfinishedBlockList, chain) + return TaskResp(TaskResp.CODE_SUCCESS) } - private fun createThreadTask(blockRecordList: List) { + private fun createThreadTask(blockRecordList: List, chain: TaskChain) { val threadTaskList = mutableListOf() - blockRecordList.forEach{ + blockRecordList.forEach { + val option = chain.getTask().getTaskOption(HttpDTaskOption::class.java) + val threadConfig = ThreadConfig(it, option, DuaContext.getDConfig().maxSpeed) + threadTaskList.add( + ThreadTask2( + adapter = if (option.isChunkTask) HttpDCTTaskAdapter(threadConfig) else HttpDThreadTaskAdapter( + threadConfig + ), + handler = chain.blockManager.handler, + record = it + ) + ) } } } \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt index a7c0e756..3590ad78 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt @@ -15,8 +15,6 @@ */ package com.arialyy.aria.http.download -import android.os.Handler -import android.os.Looper import com.arialyy.aria.core.DuaContext import com.arialyy.aria.core.inf.IBlockManager import com.arialyy.aria.core.task.BlockUtil @@ -126,7 +124,7 @@ internal class HttpDBlockInterceptor : ITaskInterceptor { * if block already exist, upload progress */ private suspend fun checkBlock(): Int { - val handler = Handler(Looper.myLooper()!!, blockManager.handlerCallback) + val handler = blockManager.handler val needUpdateBlockRecord = mutableSetOf() for (br in taskRecord.blockList) { val blockF = File(br.blockPath) @@ -145,7 +143,6 @@ internal class HttpDBlockInterceptor : ITaskInterceptor { handler.obtainMessage(IBlockManager.STATE_UPDATE_PROGRESS, br.curProgress) } } - handler.removeCallbacksAndMessages(null) // update block record val dao = DuaContext.getServiceManager().getDbService().getDuaDb().getRecordDao() diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDCTTaskAdapter.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDCTTaskAdapter.kt new file mode 100644 index 00000000..78b1d5b5 --- /dev/null +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDCTTaskAdapter.kt @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.http.download + +import com.arialyy.aria.core.task.AbsThreadTaskAdapter +import com.arialyy.aria.core.task.ThreadConfig +import com.arialyy.aria.http.ConnectionHelp +import com.arialyy.aria.http.request.IRequest.Companion.getRequest +import java.io.BufferedInputStream +import java.io.FileOutputStream +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.channels.Channels + +/** + * http chunk download thread task + * @Author laoyuyu + * @Description + * @Date 23:07 PM 2023/2/8 + **/ +class HttpDCTTaskAdapter(threadConfig: ThreadConfig) : AbsThreadTaskAdapter(threadConfig) { + + private fun getTaskOption(): HttpDTaskOption { + return threadConfig.option as HttpDTaskOption + } + + override fun handlerThreadTask() { + val taskOption = getTaskOption() + val option = taskOption.httpOption!! + val conn = getRequest(option).getDConnection(taskOption.sourUrl!!, option) + conn.doInput = true + conn.setChunkedStreamingMode(0) + + conn.connect() + BufferedInputStream(ConnectionHelp.convertInputStream(conn)).use { + readBytes(it) + } + } + + private fun readBytes(ips: InputStream) { + val fos = FileOutputStream(blockRecord.blockPath, true) + val foc = fos.channel + val fic = Channels.newChannel(ips) + val bf = ByteBuffer.allocate(BUF_SIZE) + fic.use { + var len: Long + while (fic.read(bf).also { len = it.toLong() } != -1) { + if (isBreakTask) { + break + } + if (mSpeedBandUtil != null) { + mSpeedBandUtil.limitNextBytes(len.toInt()) + } + bf.flip() + foc.write(bf) + bf.compact() + progress(len) + } + // 将数据刷出到磁盘 + foc.force(true) + foc.close() + fos.close() + } + } +} \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt index 4cf8350b..e57c0e8b 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt @@ -62,10 +62,9 @@ internal class HttpDHeaderInterceptor : ITaskInterceptor { try { val fileSize = getFileSize() if (fileSize >= 0) { - task.taskState.isSupportResume = fileSize != 0L - task.taskState.isSupportBlock = - task.taskState.isSupportResume && fileSize > BlockRecord.BLOCK_SIZE task.taskState.fileSize = fileSize + taskOption.isSupportResume = fileSize != 0L + taskOption.isSupportBlock = taskOption.isSupportResume && fileSize > BlockRecord.BLOCK_SIZE return chain.proceed(task) } } catch (e: IOException) { diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDStartController.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDStartController.kt index 1e988835..5c44a493 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDStartController.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDStartController.kt @@ -50,7 +50,7 @@ class HttpDStartController(target: Any, val url: String) : HttpBaseController(ta * use multi-threaded download file, if file size <= 5m, this setting is not valid * @param threadNum range [1 - 32] */ - fun setThreadNum(threadNum: Long): HttpDStartController { + fun setThreadNum(threadNum: Int): HttpDStartController { if (threadNum !in 1..32) { Timber.e("set thread num fail, only 0 < threadNum < 33, threadNum: $threadNum") return this diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTTaskAdapter.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTTaskAdapter.kt new file mode 100644 index 00000000..8b2f4422 --- /dev/null +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTTaskAdapter.kt @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.http.download + +import com.arialyy.aria.core.task.AbsThreadTaskAdapter +import com.arialyy.aria.core.task.ThreadConfig +import com.arialyy.aria.http.ConnectionHelp +import com.arialyy.aria.http.request.IRequest.Companion.getRequest +import timber.log.Timber +import java.io.BufferedInputStream +import java.io.FileOutputStream +import java.io.InputStream +import java.nio.ByteBuffer +import java.nio.channels.Channels + +/** + * http download thread task, not support chunk + * @Author laoyuyu + * @Description + * @Date 23:07 PM 2023/2/8 + **/ +class HttpDTTaskAdapter(threadConfig: ThreadConfig) : AbsThreadTaskAdapter(threadConfig) { + + private fun getTaskOption(): HttpDTaskOption { + return threadConfig.option as HttpDTaskOption + } + + override fun handlerThreadTask() { + val taskOption = getTaskOption() + val option = taskOption.httpOption!! + val conn = getRequest(option).getDConnection(taskOption.sourUrl!!, option) + if (!taskOption.isSupportResume) { + Timber.w("this task not support resume, url: %s", taskOption.sourUrl) + } else { + conn.setRequestProperty( + "Range", String.format( + "bytes=%s-%s", threadConfig.blockRecord.startLocation, + threadConfig.blockRecord.endLocation - 1 + ) + ) + } + + conn.connect() + BufferedInputStream(ConnectionHelp.convertInputStream(conn)).use { + readBytes(it) + } + } + + private fun readBytes(ips: InputStream) { + val fos = FileOutputStream(blockRecord.blockPath, true) + val foc = fos.channel + val fic = Channels.newChannel(ips) + val bf = ByteBuffer.allocate(BUF_SIZE) + fic.use { + var len: Long + while (fic.read(bf).also { len = it.toLong() } != -1) { + if (isBreakTask) { + break + } + if (mSpeedBandUtil != null) { + mSpeedBandUtil.limitNextBytes(len.toInt()) + } + if (rangeProgress + len >= blockRecord.endLocation) { + len = (blockRecord.endLocation - rangeProgress) + bf.flip() + fos.write(bf.array(), 0, len.toInt()) + bf.compact() + progress(len) + break + } else { + bf.flip() + foc.write(bf) + bf.compact() + progress(len) + } + } + // 将数据刷出到磁盘 + foc.force(true) + foc.close() + fos.close() + } + } +} \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt index b3ccea7b..399fd685 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskOption.kt @@ -27,9 +27,20 @@ import com.arialyy.aria.http.HttpOption **/ class HttpDTaskOption : DTaskOption() { - var httpOption: HttpOption? = null var fileSizeAdapter: IHttpFileLenAdapter? = null var taskInterceptor = mutableListOf() var isChunkTask = false + + /** + * whether block is supported, true: supported + */ + var isSupportResume = true + + /** + * whether resume task is supported + * 1. in download task, if file length not obtained, isSupportResume = false + * 2. in upload task, if service not supported resume, isSupportResume = false + */ + var isSupportBlock = true } \ No newline at end of file diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt index aa5a9fcd..ebe31f71 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.arialyy.aria.http.download import android.os.Looper @@ -43,10 +58,11 @@ internal class HttpDTaskUtil : AbsTaskUtil() { } DuaContext.duaScope.launch(Dispatchers.IO) { Looper.prepare() - blockManager?.setLopper(Looper.myLooper()!!) + blockManager?.setLooper() addCoreInterceptor(TimerInterceptor()) addCoreInterceptor(HttpDHeaderInterceptor()) addCoreInterceptor(HttpDBlockInterceptor()) + addCoreInterceptor(HttpBlockThreadInterceptor()) val resp = interceptor() Looper.loop() } diff --git a/Http/src/main/java/com/arialyy/aria/http/download/HttpDThreadTaskAdapter.java b/Http/src/main/java/com/arialyy/aria/http/download/HttpDThreadTaskAdapter.java index 676c38c3..51bf30dc 100644 --- a/Http/src/main/java/com/arialyy/aria/http/download/HttpDThreadTaskAdapter.java +++ b/Http/src/main/java/com/arialyy/aria/http/download/HttpDThreadTaskAdapter.java @@ -16,11 +16,12 @@ package com.arialyy.aria.http.download; import com.arialyy.aria.core.common.RequestEnum; -import com.arialyy.aria.core.common.SubThreadConfig; -import com.arialyy.aria.core.download.DTaskWrapper; +import com.arialyy.aria.core.task.AbsThreadTaskAdapter; +import com.arialyy.aria.core.task.ThreadConfig; import com.arialyy.aria.exception.AriaHTTPException; -import com.arialyy.aria.http.BaseHttpThreadTaskAdapter; import com.arialyy.aria.http.ConnectionHelp; +import com.arialyy.aria.http.HttpOption; +import com.arialyy.aria.http.request.IRequest; import com.arialyy.aria.util.ALog; import com.arialyy.aria.util.BufferedRandomAccessFile; import java.io.BufferedInputStream; @@ -30,7 +31,6 @@ import java.io.InputStream; import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.MalformedURLException; -import java.net.URL; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -38,41 +38,37 @@ import java.nio.channels.FileChannel; import java.nio.channels.ReadableByteChannel; import java.util.Map; import java.util.Set; +import timber.log.Timber; /** * Created by lyy on 2017/1/18. 下载线程 */ -final class HttpDThreadTaskAdapter extends BaseHttpThreadTaskAdapter { - private final String TAG = "HttpDThreadTaskAdapter"; - private DTaskWrapper mTaskWrapper; +final class HttpDThreadTaskAdapter extends AbsThreadTaskAdapter { - HttpDThreadTaskAdapter(SubThreadConfig config) { - super(config); + HttpDThreadTaskAdapter(ThreadConfig threadConfig) { + super(threadConfig); + } + + private HttpDTaskOption getTaskOption() { + return (HttpDTaskOption) getThreadConfig().getOption(); } @Override protected void handlerThreadTask() { - mTaskWrapper = (DTaskWrapper) getTaskWrapper(); - if (getThreadRecord().isComplete) { - handleComplete(); - return; - } HttpURLConnection conn = null; BufferedInputStream is = null; BufferedRandomAccessFile file = null; try { - URL url = ConnectionHelp.handleUrl(getThreadConfig().url, mTaskOption); - conn = ConnectionHelp.handleConnection(url, mTaskOption); - if (mTaskWrapper.isSupportBP()) { - ALog.d(TAG, - String.format("任务【%s】线程__%s__开始下载【开始位置 : %s,结束位置:%s】", getFileName(), - getThreadRecord().threadId, getThreadRecord().startLocation, - getThreadRecord().endLocation)); + HttpDTaskOption taskOption = getTaskOption(); + HttpOption option = taskOption.getHttpOption(); + conn = IRequest.Companion.getRequest(option).getDConnection(taskOption.getSourUrl(), option); + if (!taskOption.isSupportResume()) { + Timber.w("this task not support resume, url: %s", taskOption.getSourUrl()); + }else { conn.setRequestProperty("Range", - String.format("bytes=%s-%s", getThreadRecord().startLocation, - (getThreadRecord().endLocation - 1))); - } else { - ALog.w(TAG, "该下载不支持断点"); + String.format("bytes=%s-%s", getThreadConfig().getBlockRecord().getStartLocation(), + (getThreadConfig().getBlockRecord().getEndLocation() - 1))); } + ConnectionHelp.setConnectParam(mTaskOption, conn); conn.setConnectTimeout(getTaskConfig().getConnectTimeOut()); conn.setReadTimeout(getTaskConfig().getIOTimeOut()); //设置读取流的等待时间,必须设置该参数 @@ -81,23 +77,6 @@ final class HttpDThreadTaskAdapter extends BaseHttpThreadTaskAdapter { conn.setChunkedStreamingMode(0); } conn.connect(); - // 传递参数 - if (mTaskOption.getRequestEnum() == RequestEnum.POST) { - Map params = mTaskOption.getParams(); - if (params != null) { - OutputStreamWriter dos = new OutputStreamWriter(conn.getOutputStream()); - Set keys = params.keySet(); - StringBuilder sb = new StringBuilder(); - for (String key : keys) { - sb.append(key).append("=").append(URLEncoder.encode(params.get(key))).append("&"); - } - String paramStr = sb.toString(); - paramStr = paramStr.substring(0, paramStr.length() - 1); - dos.write(paramStr); - dos.flush(); - dos.close(); - } - } is = new BufferedInputStream(ConnectionHelp.convertInputStream(conn)); if (mTaskOption.isChunked()) { @@ -220,7 +199,8 @@ final class HttpDThreadTaskAdapter extends BaseHttpThreadTaskAdapter { } handleComplete(); } catch (IOException e) { - fail(new AriaHTTPException(String.format("文件下载失败,savePath: %s, url: %s", getEntity().getFilePath(), + fail(new AriaHTTPException( + String.format("文件下载失败,savePath: %s, url: %s", getEntity().getFilePath(), getThreadConfig().url), e), true); } finally { try { @@ -259,17 +239,11 @@ final class HttpDThreadTaskAdapter extends BaseHttpThreadTaskAdapter { } } - /** - * 处理完成配置文件的更新或事件回调 - */ - private void handleComplete() { - if (getThreadTask().isBreak()) { - return; - } - if (!getThreadTask().checkBlock()) { - return; - } + @Override public void cancel() { + + } + + @Override public void stop() { - complete(); } } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/DuaContext.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/DuaContext.kt index 4ce34a08..4618beef 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/DuaContext.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/DuaContext.kt @@ -20,7 +20,6 @@ import android.content.Context import com.arialyy.aria.core.service.LifecycleManager import com.arialyy.aria.core.service.ServiceManager import kotlinx.coroutines.MainScope -import java.util.logging.Handler /** * @Author laoyuyu @@ -44,4 +43,8 @@ object DuaContext { fun getServiceManager() = ServiceManager fun getLifeManager() = LifecycleManager + + fun getDConfig() = AriaConfig.getInstance().dConfig + + fun getUConfig() = AriaConfig.getInstance().uConfig } \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java index 1194f848..8a03a771 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java @@ -17,10 +17,8 @@ package com.arialyy.aria.core.inf; import android.os.Handler; import com.arialyy.aria.core.task.IThreadTask; -import com.arialyy.aria.core.task.ThreadTask; import com.arialyy.aria.orm.entity.BlockRecord; import java.util.List; -import kotlinx.coroutines.channels.Channel; /** * 线程任务状态 @@ -40,6 +38,8 @@ public interface IBlockManager { String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION"; String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度 + void setLooper(); + void start(List threadTaskList); void setBlockNum(int blockNum); @@ -78,5 +78,5 @@ public interface IBlockManager { /** * 创建handler 回调 */ - Handler.Callback getHandlerCallback(); + Handler getHandler(); } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsThreadTaskAdapter.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsThreadTaskAdapter.java index a58f4c85..829b5b0c 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsThreadTaskAdapter.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsThreadTaskAdapter.java @@ -15,13 +15,8 @@ */ package com.arialyy.aria.core.task; -import com.arialyy.aria.core.ThreadRecord; -import com.arialyy.aria.core.common.SubThreadConfig; -import com.arialyy.aria.core.config.BaseTaskConfig; -import com.arialyy.aria.core.wrapper.AbsTaskWrapper; -import com.arialyy.aria.exception.AriaException; +import com.arialyy.aria.orm.entity.BlockRecord; import com.arialyy.aria.util.BandwidthLimiter; -import com.arialyy.aria.util.CommonUtil; /** * @author lyy @@ -29,67 +24,56 @@ import com.arialyy.aria.util.CommonUtil; */ public abstract class AbsThreadTaskAdapter implements IThreadTaskAdapter { - protected String TAG = CommonUtil.getClassName(getClass()); /** * 速度限制工具 */ protected BandwidthLimiter mSpeedBandUtil; - private ThreadRecord mThreadRecord; private IThreadTaskObserver mObserver; - private AbsTaskWrapper mWrapper; - private SubThreadConfig mThreadConfig; - private IThreadTask mThreadTask; - - protected AbsThreadTaskAdapter(SubThreadConfig config) { - mThreadRecord = config.record; - mWrapper = config.taskWrapper; - mThreadConfig = config; - if (getTaskConfig().getMaxSpeed() > 0) { - mSpeedBandUtil = new BandwidthLimiter(getTaskConfig().getMaxSpeed(), config.startThreadNum); + private final ThreadConfig mThreadConfig; + private boolean breakTask = false; + + protected AbsThreadTaskAdapter(ThreadConfig threadConfig) { + mThreadConfig = threadConfig; + if (threadConfig.getSpeed() > 0) { + mSpeedBandUtil = + new BandwidthLimiter(threadConfig.getSpeed(), mThreadConfig.getOption().threadNum); } } - @Override public void call(IThreadTask threadTask) throws Exception { - mThreadTask = threadTask; - handlerThreadTask(); + @Override public void breakTask() { + breakTask = true; } - /** - * 开始处理线程任务 - */ - protected abstract void handlerThreadTask(); + protected boolean isBreakTask() { + return breakTask; + } - /** - * 当前线程的下去区间的进度 - */ - protected long getRangeProgress() { - return mObserver.getThreadProgress(); + protected BlockRecord getBlockRecord() { + return mThreadConfig.getBlockRecord(); } - protected ThreadRecord getThreadRecord() { - return mThreadRecord; + protected ThreadConfig getThreadConfig() { + return mThreadConfig; } - protected AbsTaskWrapper getTaskWrapper() { - return mWrapper; + @Override public void run() { + try { + handlerThreadTask(); + } catch (Exception e) { + fail(e); + } } /** - * 获取任务配置信息 + * 开始处理线程任务 */ - protected BaseTaskConfig getTaskConfig() { - return getTaskWrapper().getConfig(); - } - - protected IThreadTask getThreadTask() { - return mThreadTask; - } + protected abstract void handlerThreadTask(); /** - * 获取线程配置信息 + * 当前线程的下去区间的进度 */ - protected SubThreadConfig getThreadConfig() { - return mThreadConfig; + protected long getRangeProgress() { + return mObserver.getThreadProgress(); } @Override public void attach(IThreadTaskObserver observer) { @@ -99,26 +83,26 @@ public abstract class AbsThreadTaskAdapter implements IThreadTaskAdapter { @Override public void setMaxSpeed(int speed) { if (mSpeedBandUtil == null) { mSpeedBandUtil = - new BandwidthLimiter(getTaskConfig().getMaxSpeed(), getThreadConfig().startThreadNum); + new BandwidthLimiter(mThreadConfig.getSpeed(), mThreadConfig.getOption().threadNum); } mSpeedBandUtil.setMaxRate(speed); } protected void complete() { if (mObserver != null) { - mObserver.updateCompleteState(); + mObserver.onComplete(); } } - protected void fail(AriaException ex, boolean needRetry) { + protected void fail(Exception ex) { if (mObserver != null) { - mObserver.updateFailState(ex, needRetry); + mObserver.onFail(ex); } } protected void progress(long len) { if (mObserver != null) { - mObserver.updateProgress(len); + mObserver.onProgress(len); } } } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt index ddba3b00..146d2f01 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/DBlockManager.kt @@ -15,6 +15,7 @@ */ package com.arialyy.aria.core.task +import android.os.Handler import android.os.Handler.Callback import android.os.Looper import com.arialyy.aria.core.inf.IBlockManager @@ -49,6 +50,7 @@ class BlockManager(task: ITask) : IBlockManager { private var progress: Long = 0 //当前总进度 private lateinit var looper: Looper + private lateinit var handler: Handler private var blockNum: Int = 1 private var eventListener: IEventListener = task.getTaskOption(ITaskOption::class.java).taskListener @@ -105,6 +107,7 @@ class BlockManager(task: ITask) : IBlockManager { */ private fun quitLooper() { looper.quit() + handler.removeCallbacksAndMessages(null) scope.cancel() } @@ -116,11 +119,15 @@ class BlockManager(task: ITask) : IBlockManager { return unfinishedBlock } - override fun start(threadTaskList: List) { + override fun setLooper() { if (Looper.myLooper() == Looper.getMainLooper()) { throw IllegalThreadStateException("io operations cannot be in the main thread") } looper = Looper.myLooper()!! + handler = Handler(looper, callback) + } + + override fun start(threadTaskList: List) { threadTaskList.forEach { tt -> scope.launch(dispatcher) { tt.run() @@ -163,7 +170,7 @@ class BlockManager(task: ITask) : IBlockManager { return scope.isActive } - override fun getHandlerCallback(): Callback { - return callback + override fun getHandler(): Handler { + return handler } } \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java index 183a35e7..f6956344 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTask.java @@ -45,7 +45,4 @@ public interface IThreadTask extends Runnable { */ boolean isRunning(); - void onFail(Exception e); - - void onComplete(); } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java index bfb625a4..123012bf 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskAdapter.java @@ -23,20 +23,17 @@ package com.arialyy.aria.core.task; */ public interface IThreadTaskAdapter { - /** - * 执行任务 - */ - void run(IThreadTask threadTask); + int BUF_SIZE = 4096; /** - * 取消任务 + * 执行任务 */ - void cancel(); + void run(); /** - * 停止任务 + * 中断任务 */ - void stop(); + void breakTask(); /** * 设置当前线程最大下载速度 diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java index feef4c3d..66781611 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java @@ -28,10 +28,14 @@ public interface IThreadTaskObserver { * * @param len 新增的长度 */ - void updateProgress(long len); + void onProgress(long len); /** * 获取线程当前进度 */ long getThreadProgress(); + + void onFail(Exception e); + + void onComplete(); } diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskState.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskState.kt index e1fc917c..382e6901 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskState.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskState.kt @@ -43,17 +43,6 @@ class TaskState { */ var curProgress: Long = 0 - /** - * whether block is supported, true: supported - */ - var isSupportBlock = false - - /** - * whether resume task is supported - * 1. in download task, if file length not obtained, isSupportResume = false - * 2. in upload task, if service not supported resume, isSupportResume = false - */ - var isSupportResume = false /** * Bytes transferred in 1 second, if file size 0, return 0 diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadConfig.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadConfig.kt new file mode 100644 index 00000000..20a6a912 --- /dev/null +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadConfig.kt @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arialyy.aria.core.task + +import com.arialyy.aria.core.inf.ITaskOption +import com.arialyy.aria.orm.entity.BlockRecord + +/** + * @Author laoyuyu + * @Description + * @Date 23:12 PM 2023/2/8 + **/ +data class ThreadConfig(val blockRecord: BlockRecord, val option: ITaskOption, val speed: Int) \ No newline at end of file diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java index 0c771fc6..76ff9c11 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java @@ -331,7 +331,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver { * @param len 新增的长度 */ @Override - public synchronized void updateProgress(long len) { + public synchronized void onProgress(long len) { mRangeProgress += len; Thread loopThread = mStateHandler.getLooper().getThread(); if (!loopThread.isAlive() || loopThread.isInterrupted()) { diff --git a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt index c51fd122..c1040fb1 100644 --- a/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt +++ b/PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask2.kt @@ -47,7 +47,7 @@ class ThreadTask2( } override fun run() { - adapter.run(this) + adapter.run() } override fun cancel() { @@ -117,7 +117,7 @@ class ThreadTask2( /** * update current thread progress, once a second */ - override fun updateProgress(len: Long) { + override fun onProgress(len: Long) { record.curProgress += len if (System.currentTimeMillis() - lastUpdateTime > 1000) { lastUpdateTime = System.currentTimeMillis()