modify BlockManager.kt

v4
laoyuyu 2 years ago
parent e6867c7173
commit a446db3946
  1. 43
      Http/src/main/java/com/arialyy/aria/http/download/HttpDBlockInterceptor.kt
  2. 4
      Http/src/main/java/com/arialyy/aria/http/download/HttpDHeaderInterceptor.kt
  3. 9
      Http/src/main/java/com/arialyy/aria/http/download/HttpDStartController.kt
  4. 18
      Http/src/main/java/com/arialyy/aria/http/download/HttpDTaskUtil.kt
  5. 91
      Http/src/main/java/com/arialyy/aria/http/download/TimerInterceptor.kt
  6. 4
      Http/src/main/java/com/arialyy/aria/http/upload/HttpULoader.java
  7. 6
      M3U8Component/src/main/java/com/arialyy/aria/m3u8/live/LiveStateManager.java
  8. 4
      M3U8Component/src/main/java/com/arialyy/aria/m3u8/live/M3U8LiveLoader.java
  9. 4
      M3U8Component/src/main/java/com/arialyy/aria/m3u8/vod/M3U8VodLoader.java
  10. 6
      M3U8Component/src/main/java/com/arialyy/aria/m3u8/vod/VodStateManager.java
  11. 4
      PublicComponent/src/main/java/com/arialyy/aria/core/group/AbsGroupLoader.java
  12. 20
      PublicComponent/src/main/java/com/arialyy/aria/core/group/SimpleSchedulers.java
  13. 8
      PublicComponent/src/main/java/com/arialyy/aria/core/inf/IBlockManager.java
  14. 6
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/AbsNormalLoader.java
  15. 10
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/GroupSubThreadStateManager.java
  16. 4
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/ILoaderVisitor.java
  17. 4
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/LoaderStructure.java
  18. 7
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/NormalLoader.java
  19. 6
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/NormalTTBuilder.java
  20. 22
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/SubLoader.java
  21. 6
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/TaskThreadStateManager.java
  22. 6
      PublicComponent/src/main/java/com/arialyy/aria/core/loader/UploadThreadStateManager.java
  23. 5
      PublicComponent/src/main/java/com/arialyy/aria/core/service/DbService.kt
  24. 7
      PublicComponent/src/main/java/com/arialyy/aria/core/task/AbsTaskUtil.kt
  25. 42
      PublicComponent/src/main/java/com/arialyy/aria/core/task/BlockManager.kt
  26. 6
      PublicComponent/src/main/java/com/arialyy/aria/core/task/BlockState.kt
  27. 89
      PublicComponent/src/main/java/com/arialyy/aria/core/task/BlockUtil.kt
  28. 2
      PublicComponent/src/main/java/com/arialyy/aria/core/task/ITaskInterceptor.kt
  29. 4
      PublicComponent/src/main/java/com/arialyy/aria/core/task/IThreadTaskObserver.java
  30. 7
      PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskChain.kt
  31. 14
      PublicComponent/src/main/java/com/arialyy/aria/core/task/TaskState.kt
  32. 34
      PublicComponent/src/main/java/com/arialyy/aria/core/task/ThreadTask.java
  33. 7
      PublicComponent/src/main/java/com/arialyy/aria/orm/dao/RecordDao.kt
  34. 5
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/BlockRecord.kt
  35. 5
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DEntity.kt
  36. 6
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGEntity.kt
  37. 2
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/DGUrlConverter.kt
  38. 22
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/FilePathConverter.kt
  39. 5
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/MEntity.kt
  40. 14
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/TaskRecord.kt
  41. 5
      PublicComponent/src/main/java/com/arialyy/aria/orm/entity/UEntity.kt
  42. 20
      PublicComponent/src/main/java/com/arialyy/aria/util/FileUtil.java
  43. 4
      SFtpComponent/src/main/java/com/arialyy/aria/sftp/download/SFtpDLoader.java
  44. 4
      SFtpComponent/src/main/java/com/arialyy/aria/sftp/upload/SFtpULoader.java

@ -16,11 +16,12 @@
package com.arialyy.aria.http.download
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.task.BlockManager
import com.arialyy.aria.core.task.BlockUtil
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.TaskChain
import com.arialyy.aria.core.task.TaskResp
import com.arialyy.aria.orm.entity.TaskRecord
import timber.log.Timber
/**
@ -30,29 +31,53 @@ import timber.log.Timber
*/
internal class HttpDBlockInterceptor : ITaskInterceptor {
private lateinit var task: ITask
private lateinit var option: HttpDTaskOption
override suspend fun interceptor(chain: TaskChain): TaskResp {
task = chain.getTask()
option = task.getTaskOption(HttpDTaskOption::class.java)
if (task.taskState.fileSize < 0) {
Timber.e("file size < 0")
return TaskResp(TaskResp.CODE_GET_FILE_INFO_FAIL)
}
// if task not support resume, don't save record
if (task.taskState.fileSize == 0L) {
chain.blockManager.setBlockNum(1)
checkBlock()
return chain.proceed(chain.getTask())
}
val blockNum = checkRecord()
chain.blockManager.setBlockNum(blockNum)
checkBlock()
return chain.proceed(chain.getTask())
}
/**
* check task record, if record no exist, create taskRecord
* @return blockNum
*/
private suspend fun checkRecord() {
val recordWrapper = DuaContext.getServiceManager().getDbService().getDuaDb()?.getRecordDao()
?.getTaskRecordByKey(task.taskKey)
if (recordWrapper == null){
private suspend fun checkRecord(): Int {
val recordDao = DuaContext.getServiceManager().getDbService().getDuaDb().getRecordDao()
val recordWrapper = recordDao.getTaskRecordByKey(task.taskKey)
if (recordWrapper == null) {
Timber.i("record not found, create record")
val blockNumInfo = BlockUtil.getBlockNum(task.taskState.fileSize)
val taskRecord = TaskRecord(
taskKey = task.taskKey,
filePath = option.savePathUri!!,
taskType = ITask.DOWNLOAD,
fileLen = task.taskState.fileSize,
blockNum = blockNumInfo.first,
blockSize = task.taskState.blockSize
)
taskRecord.blockList.addAll(BlockUtil.createBlockRecord(task.taskState.fileSize))
recordDao.insert(taskRecord)
return taskRecord.blockNum
}
}
private fun createBlockManager(): BlockManager {
Timber.d("record existed")
return recordWrapper.taskRecord.blockNum
}
/**

@ -21,6 +21,7 @@ import android.os.Looper
import android.os.Process
import android.text.TextUtils
import com.arialyy.aria.core.processor.IHttpFileLenAdapter
import com.arialyy.aria.core.task.BlockState
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.TaskChain
@ -61,6 +62,9 @@ internal class HttpDHeaderInterceptor : ITaskInterceptor {
try {
val fileSize = getFileSize()
if (fileSize >= 0) {
task.taskState.isSupportResume = fileSize != 0L
task.taskState.isSupportBlock =
task.taskState.isSupportResume && fileSize > BlockState.BLOCK_SIZE
task.taskState.fileSize = fileSize
return chain.proceed(task)
}

@ -122,18 +122,15 @@ class HttpDStartController(target: Any, val url: String) : HttpBaseController(ta
* find DEntity, if that not exist, create and save it
*/
private suspend fun findDEntityBySavePath(option: HttpDTaskOption): DEntity {
val savePath = option.savePathUri?.toString()
if (savePath.isNullOrEmpty()) {
throw IllegalArgumentException("savePath is null")
}
val savePath = option.savePathUri
val dao = DuaContext.getServiceManager().getDbService().getDuaDb()?.getDEntityDao()
val de = dao?.getDEntityBySavePath(savePath)
val de = dao?.getDEntityBySavePath(savePath.toString())
if (de != null) {
return de
}
val newDe = DEntity(
sourceUrl = option.sourUrl!!,
savePath = savePath,
savePath = savePath!!,
)
dao?.insert(newDe)
return newDe

@ -1,7 +1,10 @@
package com.arialyy.aria.http.download
import android.os.Looper
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.task.AbsTaskUtil
import com.arialyy.aria.core.task.BlockManager
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
@ -12,6 +15,14 @@ import kotlinx.coroutines.launch
**/
internal class HttpDTaskUtil : AbsTaskUtil() {
private var blockManager: BlockManager? = null
override fun getBlockManager(): IBlockManager {
if (blockManager == null) {
blockManager = BlockManager(getTask().getTaskOption(HttpDTaskOption::class.java).taskListener)
}
return blockManager!!
}
override fun isRunning(): Boolean {
TODO("Not yet implemented")
}
@ -30,9 +41,14 @@ internal class HttpDTaskUtil : AbsTaskUtil() {
addInterceptors(it)
}
}
addCoreInterceptor(HttpDHeaderInterceptor())
DuaContext.duaScope.launch(Dispatchers.IO) {
Looper.prepare()
blockManager?.setLopper(Looper.myLooper()!!)
addCoreInterceptor(TimerInterceptor())
addCoreInterceptor(HttpDHeaderInterceptor())
addCoreInterceptor(HttpDBlockInterceptor())
val resp = interceptor()
Looper.loop()
}
}
}

@ -0,0 +1,91 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.http.download
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskOption
import com.arialyy.aria.core.manager.ThreadTaskManager
import com.arialyy.aria.core.task.ITask
import com.arialyy.aria.core.task.ITaskInterceptor
import com.arialyy.aria.core.task.TaskChain
import com.arialyy.aria.core.task.TaskResp
import timber.log.Timber
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.TimeUnit.MILLISECONDS
/**
* @Author laoyuyu
* @Description
* @Date 2:27 PM 2023/2/05
**/
open class TimerInterceptor : ITaskInterceptor {
private var mTimer: ScheduledThreadPoolExecutor = ScheduledThreadPoolExecutor(1)
@Synchronized private fun closeTimer() {
if (!mTimer.isShutdown) {
mTimer.shutdown()
}
}
override suspend fun interceptor(chain: TaskChain): TaskResp {
val b = startTimer(chain)
if (!b) {
return TaskResp(TaskResp.CODE_INTERRUPT)
}
return chain.proceed(chain.getTask())
}
@Synchronized private fun startTimer(chain: TaskChain): Boolean {
closeTimer()
try {
mTimer = ScheduledThreadPoolExecutor(1)
val blockManager = chain.blockManager
mTimer.scheduleWithFixedDelay(object : Runnable {
override fun run() {
// 线程池中是不抛异常的,没有日志,很难定位问题,需要手动try-catch
try {
if (blockManager.isCompleted
|| blockManager.hasFailedBlock()
|| !isRunning(chain.getTask())
) {
ThreadTaskManager.getInstance().removeTaskThread(chain.getTask().taskId)
closeTimer()
return
}
if (chain.getTask().taskState.curProgress >= 0) {
chain.getTask().getTaskOption(ITaskOption::class.java).taskListener.onProgress(
blockManager.currentProgress
)
return
}
Timber.d("未知状态")
} catch (e: Exception) {
e.printStackTrace()
}
}
}, 0, 1000, MILLISECONDS)
} catch (e: Exception) {
Timber.e(e)
return false
}
return true
}
@Synchronized fun isRunning(task: ITask): Boolean {
return ThreadTaskManager.getInstance().taskIsRunning(task.taskId)
}
}

@ -17,7 +17,7 @@ package com.arialyy.aria.http.upload;
import android.os.Handler;
import android.os.Looper;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.loader.AbsNormalLoader;
import com.arialyy.aria.core.loader.IInfoTask;
@ -47,7 +47,7 @@ final class HttpULoader extends AbsNormalLoader<UTaskWrapper> {
}
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

@ -21,7 +21,7 @@ import android.os.Looper;
import android.os.Message;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.listener.ISchedulers;
import com.arialyy.aria.core.loader.ILoaderVisitor;
@ -36,7 +36,7 @@ import java.nio.charset.Charset;
import static com.arialyy.aria.m3u8.M3U8InfoTask.M3U8_INDEX_FORMAT;
final class LiveStateManager implements IThreadStateManager {
final class LiveStateManager implements IBlockManager {
private final String TAG = CommonUtil.getClassName(getClass());
private M3U8Listener mListener;
@ -83,7 +83,7 @@ final class LiveStateManager implements IThreadStateManager {
case STATE_RUNNING:
Bundle b = msg.getData();
if (b != null) {
long len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0);
long len = b.getLong(IBlockManager.DATA_ADD_LEN, 0);
mProgress += len;
}
break;

@ -23,7 +23,7 @@ import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.common.SubThreadConfig;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.loader.IInfoTask;
import com.arialyy.aria.core.loader.IRecordHandler;
import com.arialyy.aria.core.loader.IThreadTaskBuilder;
@ -333,7 +333,7 @@ final class M3U8LiveLoader extends BaseM3U8Loader {
/**
* 需要在{@link #addComponent(IRecordHandler)} 后调用
*/
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

@ -28,7 +28,7 @@ import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.event.Event;
import com.arialyy.aria.core.event.EventMsgUtil;
import com.arialyy.aria.core.event.PeerIndexEvent;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.loader.IInfoTask;
import com.arialyy.aria.core.loader.IRecordHandler;
import com.arialyy.aria.core.loader.IThreadTaskBuilder;
@ -560,7 +560,7 @@ final class M3U8VodLoader extends BaseM3U8Loader {
/**
* 需要在 {@link #addComponent(IRecordHandler)}后调用
*/
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

@ -23,7 +23,7 @@ import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.ThreadRecord;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.ISchedulers;
import com.arialyy.aria.core.loader.ILoaderVisitor;
import com.arialyy.aria.core.manager.ThreadTaskManager;
@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* m3u8 点播下载状态管理器
*/
public final class VodStateManager implements IThreadStateManager {
public final class VodStateManager implements IBlockManager {
private final String TAG = CommonUtil.getClassName(getClass());
private M3U8Listener listener;
@ -145,7 +145,7 @@ public final class VodStateManager implements IThreadStateManager {
case STATE_RUNNING:
Bundle b = msg.getData();
if (b != null) {
long len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0);
long len = b.getLong(IBlockManager.DATA_ADD_LEN, 0);
progress += len;
}
break;

@ -21,7 +21,7 @@ import com.arialyy.aria.core.config.Configuration;
import com.arialyy.aria.core.download.DGTaskWrapper;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.inf.IEntity;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IDGroupListener;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.loader.IInfoTask;
@ -360,7 +360,7 @@ public abstract class AbsGroupLoader implements ILoaderVisitor, ILoader {
* @deprecated 组合任务不需要实现这个其内部是一个子任务调度器并不是线程状态管理器
*/
@Deprecated
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
}

@ -23,7 +23,7 @@ import android.util.Log;
import com.arialyy.aria.core.AriaConfig;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.config.Configuration;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.loader.IRecordHandler;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.exception.ExceptionFactory;
@ -58,37 +58,37 @@ final class SimpleSchedulers implements Handler.Callback {
ALog.w(TAG, "组合任务子任务调度数据为空");
return true;
}
String threadName = b.getString(IThreadStateManager.DATA_THREAD_NAME);
String threadName = b.getString(IBlockManager.DATA_THREAD_NAME);
AbsSubDLoadUtil loaderUtil = mQueue.getLoaderUtil(threadName);
if (loaderUtil == null) {
ALog.e(TAG, String.format("子任务loader不存在,state:%s,key:%s", msg.what, threadName));
return true;
}
long curLocation = b.getLong(IThreadStateManager.DATA_THREAD_LOCATION,
long curLocation = b.getLong(IBlockManager.DATA_THREAD_LOCATION,
loaderUtil.getLoader().getWrapper().getEntity().getCurrentProgress());
// 处理状态
switch (msg.what) {
case IThreadStateManager.STATE_RUNNING:
case IBlockManager.STATE_RUNNING:
long range = (long) msg.obj;
mGState.listener.onSubRunning(loaderUtil.getEntity(), range);
break;
case IThreadStateManager.STATE_PRE:
case IBlockManager.STATE_PRE:
mGState.listener.onSubPre(loaderUtil.getEntity());
mGState.updateCount(loaderUtil.getKey());
break;
case IThreadStateManager.STATE_START:
case IBlockManager.STATE_START:
mGState.listener.onSubStart(loaderUtil.getEntity());
break;
case IThreadStateManager.STATE_STOP:
case IBlockManager.STATE_STOP:
handleStop(loaderUtil, curLocation);
ThreadTaskManager.getInstance().removeSingleTaskThread(mKey, threadName);
break;
case IThreadStateManager.STATE_COMPLETE:
case IBlockManager.STATE_COMPLETE:
handleComplete(loaderUtil);
ThreadTaskManager.getInstance().removeSingleTaskThread(mKey, threadName);
break;
case IThreadStateManager.STATE_FAIL:
boolean needRetry = b.getBoolean(IThreadStateManager.DATA_RETRY, false);
case IBlockManager.STATE_FAIL:
boolean needRetry = b.getBoolean(IBlockManager.DATA_RETRY, false);
handleFail(loaderUtil, needRetry);
ThreadTaskManager.getInstance().removeSingleTaskThread(mKey, threadName);
break;

@ -16,11 +16,13 @@
package com.arialyy.aria.core.inf;
import android.os.Handler;
import android.os.Looper;
import androidx.annotation.NonNull;
/**
* 线程任务状态
*/
public interface IThreadStateManager {
public interface IBlockManager {
int STATE_STOP = 0x01;
int STATE_FAIL = 0x02;
int STATE_CANCEL = 0x03;
@ -35,6 +37,10 @@ public interface IThreadStateManager {
String DATA_THREAD_LOCATION = "DATA_THREAD_LOCATION";
String DATA_ADD_LEN = "DATA_ADD_LEN"; // 增加的长度
void setLopper(@NonNull Looper looper);
void setBlockNum(int blockNum);
/**
* 是否有失败的快
*

@ -18,7 +18,7 @@ package com.arialyy.aria.core.loader;
import android.os.Looper;
import android.util.Log;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.core.task.IThreadTask;
@ -58,7 +58,7 @@ public abstract class AbsNormalLoader<T extends AbsTaskWrapper> implements ILoad
private boolean isRuning = false;
protected IRecordHandler mRecordHandler;
protected IThreadStateManager mStateManager;
protected IBlockManager mStateManager;
protected IInfoTask mInfoTask;
protected IThreadTaskBuilder mTTBuilder;
@ -81,7 +81,7 @@ public abstract class AbsNormalLoader<T extends AbsTaskWrapper> implements ILoad
return mListener;
}
protected IThreadStateManager getStateManager() {
protected IBlockManager getStateManager() {
return mStateManager;
}

@ -21,7 +21,7 @@ import android.os.Looper;
import android.os.Message;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import com.arialyy.aria.util.FileUtil;
@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程任务管理器用于处理多线程下载时任务的状态回调
*/
public class GroupSubThreadStateManager implements IThreadStateManager {
public class GroupSubThreadStateManager implements IBlockManager {
private final String TAG = CommonUtil.getClassName(this);
/**
@ -120,7 +120,7 @@ public class GroupSubThreadStateManager implements IThreadStateManager {
}*/
if (!mergeFile()) {
Bundle b=msg.getData();
b.putBoolean(IThreadStateManager.DATA_RETRY, false);
b.putBoolean(IBlockManager.DATA_RETRY, false);
msg.setData(b);
msg.what = STATE_FAIL;
sendMessageFromMsg(msg);
@ -139,7 +139,7 @@ public class GroupSubThreadStateManager implements IThreadStateManager {
case STATE_RUNNING:
Bundle b = msg.getData();
if (b != null) {
long len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0);
long len = b.getLong(IBlockManager.DATA_ADD_LEN, 0);
mProgress += len;
}
msg.obj=mProgress;
@ -162,7 +162,7 @@ public class GroupSubThreadStateManager implements IThreadStateManager {
public void sendMessageFromMsg(Message msg){
Message mMsg=mHandler.obtainMessage();
Bundle b=mMsg.getData();
b.putString(IThreadStateManager.DATA_THREAD_NAME,mKey);
b.putString(IBlockManager.DATA_THREAD_NAME,mKey);
msg.setData(b);
mMsg.copyFrom(msg);
mHandler.sendMessage(mMsg);

@ -15,7 +15,7 @@
*/
package com.arialyy.aria.core.loader;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
/**
* 加载器访问者
@ -35,7 +35,7 @@ public interface ILoaderVisitor {
/**
* 线程状态
*/
void addComponent(IThreadStateManager threadState);
void addComponent(IBlockManager threadState);
/**
* 构造线程任务

@ -15,7 +15,7 @@
*/
package com.arialyy.aria.core.loader;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import java.util.ArrayList;
import java.util.List;
@ -33,7 +33,7 @@ public class LoaderStructure {
* 将组件加入到集合必须添加以下集合
* 1 {@link IRecordHandler}
* 2 {@link IInfoTask}
* 3 {@link IThreadStateManager}
* 3 {@link IBlockManager}
* 4 {@link IThreadTaskBuilder}
*
* @param component 待添加的组件

@ -21,18 +21,15 @@ import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.AbsNormalEntity;
import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.event.EventMsgUtil;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IDLoadListener;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.core.task.AbsTask;
import com.arialyy.aria.core.task.IThreadTask;
import com.arialyy.aria.core.wrapper.AbsTaskWrapper;
import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.FileUtil;
import java.io.File;
import java.util.List;
/**
* 单文件
@ -162,7 +159,7 @@ public class NormalLoader<T extends AbsTaskWrapper> extends AbsNormalLoader<T> {
});
}
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

@ -8,7 +8,7 @@ import com.arialyy.aria.core.ThreadRecord;
import com.arialyy.aria.core.common.AbsNormalEntity;
import com.arialyy.aria.core.common.SubThreadConfig;
import com.arialyy.aria.core.download.DGTaskWrapper;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.task.IThreadTask;
import com.arialyy.aria.core.task.ThreadTask;
import com.arialyy.aria.core.wrapper.AbsTaskWrapper;
@ -106,12 +106,12 @@ public final class NormalTTBuilder implements IThreadTaskBuilder {
currentProgress += endL - startL;
ALog.d(TAG, String.format("任务【%s】线程__%s__已完成", mWrapper.getKey(), i));
Message msg = mStateHandler.obtainMessage();
msg.what = IThreadStateManager.STATE_COMPLETE;
msg.what = IBlockManager.STATE_COMPLETE;
Bundle b = msg.getData();
if (b == null) {
b = new Bundle();
}
b.putString(IThreadStateManager.DATA_THREAD_NAME,
b.putString(IBlockManager.DATA_THREAD_NAME,
CommonUtil.getThreadName(getEntity().getKey(), tr.threadId));
msg.setData(b);
msg.sendToTarget();

@ -23,7 +23,7 @@ import android.text.TextUtils;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.core.task.IThreadTask;
import com.arialyy.aria.core.task.ThreadTask;
@ -51,7 +51,7 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
private List<IThreadTask> mTask = new ArrayList<>();
private String parentKey;
private TaskRecord record;
protected IThreadStateManager mStateManager;
protected IBlockManager mStateManager;
public SubLoader(AbsTaskWrapper wrapper, Handler schedulers) {
this.wrapper = wrapper;
@ -65,7 +65,7 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
/**
* 发送状态到调度器
*
* @param state {@link IThreadStateManager}
* @param state {@link IBlockManager}
*/
private void sendNormalState(int state) {
Message msg = schedulers.obtainMessage();
@ -73,7 +73,7 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
if (b == null) {
b = new Bundle();
}
b.putString(IThreadStateManager.DATA_THREAD_NAME, getKey());
b.putString(IBlockManager.DATA_THREAD_NAME, getKey());
msg.what = state;
msg.setData(b);
msg.sendToTarget();
@ -88,9 +88,9 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
if (b == null) {
b = new Bundle();
}
b.putString(IThreadStateManager.DATA_THREAD_NAME, getKey());
b.putBoolean(IThreadStateManager.DATA_RETRY, needRetry);
msg.what = IThreadStateManager.STATE_FAIL;
b.putString(IBlockManager.DATA_THREAD_NAME, getKey());
b.putBoolean(IBlockManager.DATA_RETRY, needRetry);
msg.what = IBlockManager.STATE_FAIL;
msg.setData(b);
msg.sendToTarget();
}
@ -117,7 +117,7 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
&& !record.threadRecords.isEmpty()
&& record.threadRecords.get(0).isComplete) {
ALog.d(TAG, "子任务已完成,key:" + wrapper.getKey());
sendNormalState(IThreadStateManager.STATE_COMPLETE);
sendNormalState(IBlockManager.STATE_COMPLETE);
return;
}
List<IThreadTask> task =
@ -134,14 +134,14 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
return;
}
sendNormalState(IThreadStateManager.STATE_PRE);
sendNormalState(IBlockManager.STATE_PRE);
mTask.addAll(task);
try {
for (IThreadTask iThreadTask : mTask) {
ThreadTaskManager.getInstance().startThread(parentKey, iThreadTask);
}
sendNormalState(IThreadStateManager.STATE_START);
sendNormalState(IBlockManager.STATE_START);
mStateManager.updateCurrentProgress(getWrapper().getEntity().getCurrentProgress());
} catch (Exception e) {
@ -254,7 +254,7 @@ public final class SubLoader implements ILoader, ILoaderVisitor {
});
}
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

@ -19,7 +19,7 @@ import android.os.Bundle;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.util.ALog;
@ -33,7 +33,7 @@ import timber.log.Timber;
/**
* 线程任务管理器用于处理多线程下载时任务的状态回调
*/
public class TaskThreadStateManager implements IThreadStateManager {
public class TaskThreadStateManager implements IBlockManager {
/**
* 分块文件路径: 文件路径.blockId.part
*/
@ -113,7 +113,7 @@ public class TaskThreadStateManager implements IThreadStateManager {
case STATE_RUNNING:
Bundle b = msg.getData();
if (b != null) {
long len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0);
long len = b.getLong(IBlockManager.DATA_ADD_LEN, 0);
mProgress += len;
}

@ -21,7 +21,7 @@ import android.os.Looper;
import android.os.Message;
import com.arialyy.aria.core.TaskRecord;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.exception.AriaException;
import com.arialyy.aria.util.ALog;
@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程任务管理器用于处理多线程下载时任务的状态回调
*/
public class UploadThreadStateManager implements IThreadStateManager {
public class UploadThreadStateManager implements IBlockManager {
private final String TAG = CommonUtil.getClassName(this);
/**
@ -111,7 +111,7 @@ public class UploadThreadStateManager implements IThreadStateManager {
case STATE_RUNNING:
Bundle b = msg.getData();
if (b != null) {
long len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0);
long len = b.getLong(IBlockManager.DATA_ADD_LEN, 0);
mProgress += len;
}

@ -48,14 +48,13 @@ class DbService : IService {
}
}
fun getDuaDb() = duaDb
fun getDuaDb() = duaDb!!
override fun init(context: Context) {
var customDb = findCustomDatabase(context)
if (customDb == null) {
customDb = DefaultDbProvider().generateDb(context)
}
duaDb = customDb
.build()
duaDb = customDb.build()
}
}

@ -15,6 +15,7 @@
*/
package com.arialyy.aria.core.task
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.inf.ITaskUtil
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.core.task.ITaskInterceptor.IChain
@ -38,6 +39,8 @@ abstract class AbsTaskUtil : ITaskUtil {
protected fun getTask() = mTask
abstract fun getBlockManager(): IBlockManager
/**
* add user interceptor
*/
@ -52,14 +55,14 @@ abstract class AbsTaskUtil : ITaskUtil {
/**
* if interruption occurred, stop cmd
*/
protected open fun interceptor(): TaskResp? {
protected open suspend fun interceptor(): TaskResp? {
if (mUserInterceptor.isEmpty()) {
return null
}
val interceptors: MutableList<ITaskInterceptor> = ArrayList()
interceptors.addAll(mUserInterceptor)
interceptors.addAll(mCoreInterceptor)
val chain: IChain = TaskChain(interceptors, 0, mTask)
val chain: IChain = TaskChain(interceptors, 0, mTask, getBlockManager())
return chain.proceed(mTask)
}
}

@ -17,68 +17,64 @@ package com.arialyy.aria.core.task
import android.os.Handler.Callback
import android.os.Looper
import com.arialyy.aria.core.inf.IThreadStateManager
import com.arialyy.aria.core.inf.IBlockManager
import com.arialyy.aria.core.listener.IEventListener
import com.arialyy.aria.exception.AriaException
import timber.log.Timber
import java.util.concurrent.atomic.AtomicInteger
class BlockManager(
val mListener: IEventListener,
val looper: Looper,
private val blockNum: Int
) : IThreadStateManager {
class BlockManager(private val eventListener: IEventListener) : IBlockManager {
private val blockList = mutableListOf<BlockState>()
private val canceledNum = AtomicInteger(0) // 已经取消的线程的数
private val stoppedNum = AtomicInteger(0) // 已经停止的线程数
private val failedNum = AtomicInteger(0) // 失败的线程数
private val completedNum = AtomicInteger(0) // 完成的线程数
private var progress: Long = 0 //当前总进度
private lateinit var looper: Looper
private var blockNum: Int = 1
private val callback = Callback { msg ->
when (msg.what) {
IThreadStateManager.STATE_STOP -> {
IBlockManager.STATE_STOP -> {
stoppedNum.getAndIncrement()
if (isStopped) {
quitLooper()
}
}
IThreadStateManager.STATE_CANCEL -> {
IBlockManager.STATE_CANCEL -> {
canceledNum.getAndIncrement()
if (isCanceled) {
quitLooper()
}
}
IThreadStateManager.STATE_FAIL -> {
IBlockManager.STATE_FAIL -> {
failedNum.getAndIncrement()
if (hasFailedBlock()) {
val b = msg.data
mListener.onFail(
b.getBoolean(IThreadStateManager.DATA_RETRY, false),
b.getSerializable(IThreadStateManager.DATA_ERROR_INFO) as AriaException?
eventListener.onFail(
b.getBoolean(IBlockManager.DATA_RETRY, false),
b.getSerializable(IBlockManager.DATA_ERROR_INFO) as AriaException?
)
quitLooper()
}
}
IThreadStateManager.STATE_COMPLETE -> {
IBlockManager.STATE_COMPLETE -> {
completedNum.getAndIncrement()
if (isCompleted) {
Timber.d("isComplete, completeNum = %s", completedNum)
mListener.onComplete()
eventListener.onComplete()
quitLooper()
}
}
IThreadStateManager.STATE_RUNNING -> {
IBlockManager.STATE_RUNNING -> {
val b = msg.data
if (b != null) {
val len = b.getLong(IThreadStateManager.DATA_ADD_LEN, 0)
val len = b.getLong(IBlockManager.DATA_ADD_LEN, 0)
progress += len
}
}
IThreadStateManager.STATE_UPDATE_PROGRESS -> {
IBlockManager.STATE_UPDATE_PROGRESS -> {
progress = msg.obj as Long
}
}
@ -100,6 +96,14 @@ class BlockManager(
return blockList.removeFirst()
}
override fun setLopper(looper: Looper) {
this.looper = looper
}
override fun setBlockNum(blockNum: Int) {
this.blockNum = blockNum
}
override fun hasFailedBlock(): Boolean {
Timber.d("isFailed, blockBum = ${blockNum}, completedNum = ${completedNum.get()}, ")
return failedNum.get() != 0

@ -15,7 +15,9 @@
*/
package com.arialyy.aria.core.task
class BlockState(val blockId: Int, val blockPath: String) {
import com.arialyy.aria.orm.entity.BlockRecord
class BlockState(val blockState: BlockRecord, val blockPath: String) {
companion object {
/**
@ -23,7 +25,7 @@ class BlockState(val blockId: Int, val blockPath: String) {
*/
const val BLOCK_PATH = "%s/.%s.%d"
const val BLOCK_SIZE = 1024 * 1024 * 5
const val BLOCK_SIZE = 1024 * 1024 * 5L
}
var curProgress: Long = 0

@ -0,0 +1,89 @@
package com.arialyy.aria.core.task
import android.bluetooth.BluetoothClass
import com.arialyy.aria.orm.entity.BlockRecord
import com.arialyy.aria.orm.entity.TaskRecord
import com.arialyy.aria.util.FileUtil
import timber.log.Timber
import java.io.File
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
object BlockUtil {
/**
* create block record
*/
fun createBlockRecord(fileSize: Long): List<BlockRecord> {
val blockNumInfo = getBlockNum(fileSize)
val lastIndex = blockNumInfo.first - 1
val brList = mutableListOf<BlockRecord>()
for (bi in 0 until blockNumInfo.first) {
val sl = bi * BlockState.BLOCK_SIZE
val blockSize = if (bi == lastIndex) blockNumInfo.second else BlockState.BLOCK_SIZE
val el = sl + blockSize
val blockRecord = BlockRecord(
bId = bi,
startLocation = bi * BlockState.BLOCK_SIZE,
endLocation = el,
blockSize = blockSize,
)
brList.add(blockRecord)
}
return brList
}
/**
* Get the number of blocks according to the file length
* @return pair<blockNum, lastBlockSize>
*/
fun getBlockNum(fileLen: Long): Pair<Int, Long> {
if (fileLen <= BlockState.BLOCK_SIZE) {
return Pair(1, 0)
}
val blockNum = (fileLen / BlockState.BLOCK_SIZE).toInt()
val lastBlockSize = fileLen % BlockState.BLOCK_SIZE
return Pair(if (lastBlockSize != 0L) blockNum + 1 else blockNum, lastBlockSize)
}
/**
* merge block file,if success,return true else return false
*/
fun mergeFile(record: TaskRecord): Boolean {
val targetF = File(record.filePath)
val dir = targetF.parentFile ?: return false
val fileName = targetF.name
if (record.blockNum == 1) {
// if this task not support blocks or fileSize < 5m, just need rename
return File(BlockState.BLOCK_PATH.format(dir, fileName, 0)).renameTo(targetF)
}
val blockList = mutableListOf<File>()
for (i in 0 until record.blockNum) {
val subF = File(BlockState.BLOCK_PATH.format(dir, fileName, i))
if (!subF.exists()) {
Timber.e("this block: $i not exists")
return false
}
if (subF.length() != BlockState.BLOCK_SIZE.toLong()) {
Timber.e("this block: $i size abnormal, size: ${subF.length()}")
return false
}
blockList.add(subF)
}
return FileUtil.mergeFile(record.filePath, blockList)
}
}

@ -26,6 +26,6 @@ interface ITaskInterceptor {
interface IChain {
fun getTask(): ITask
fun proceed(task: ITask): TaskResp
suspend fun proceed(task: ITask): TaskResp
}
}

@ -16,7 +16,7 @@
package com.arialyy.aria.core.task;
import android.os.Bundle;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.exception.AriaException;
/**
@ -30,7 +30,7 @@ public interface IThreadTaskObserver {
/**
* 更新所有状态
*
* @param state state {@link IThreadStateManager#STATE_STOP}..
* @param state state {@link IBlockManager#STATE_STOP}..
*/
void updateState(int state, Bundle bundle);

@ -15,6 +15,8 @@
*/
package com.arialyy.aria.core.task
import com.arialyy.aria.core.inf.IBlockManager
/**
* @Author laoyuyu
* @Description
@ -24,14 +26,15 @@ class TaskChain(
private val interceptors: List<ITaskInterceptor>,
private val index: Int = 0,
private val task: ITask,
val blockManager: IBlockManager
) : ITaskInterceptor.IChain {
override fun getTask(): ITask {
return task
}
override fun proceed(task: ITask): TaskResp {
val next = TaskChain(interceptors, index, task)
override suspend fun proceed(task: ITask): TaskResp {
val next = TaskChain(interceptors, index, task, blockManager)
val interceptor = interceptors[index]
return interceptor.interceptor(next)
}

@ -43,6 +43,18 @@ class TaskState {
*/
var curProgress: Long = 0
/**
* whether block is supported, true: supported
*/
var isSupportBlock = false
/**
* whether resume task is supported
* 1. in download task, if file length not obtained, isSupportResume = false
* 2. in upload task, if service not supported resume, isSupportResume = false
*/
var isSupportResume = false
/**
* Bytes transferred in 1 second, if file size 0, return 0
* curSpeed, unit: byte/s
@ -71,6 +83,8 @@ class TaskState {
*/
var timeLeft: Int = Int.Companion.MAX_VALUE
val blockSize = BlockState.BLOCK_SIZE
fun getPercent() = ((curProgress * 100) / fileSize).toInt()
fun isCompleted() = state == IEntity.STATE_COMPLETE

@ -24,7 +24,7 @@ import com.arialyy.aria.core.AriaConfig;
import com.arialyy.aria.core.ThreadRecord;
import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.SubThreadConfig;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.ISchedulers;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.core.wrapper.AbsTaskWrapper;
@ -178,7 +178,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
taskBreak = true;
if (mTaskWrapper.isSupportBP()) {
final long currentTemp = mRangeProgress;
updateState(IThreadStateManager.STATE_STOP, null);
updateState(IBlockManager.STATE_STOP, null);
ALog.d(TAG, String.format("任务【%s】thread__%s__中断【停止位置:%s】", getFileName(),
mRecord.threadId, currentTemp));
writeConfig(false, currentTemp);
@ -254,7 +254,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
public void stop() {
isStop = true;
final long stopLocation = mRangeProgress;
updateState(IThreadStateManager.STATE_STOP, null);
updateState(IBlockManager.STATE_STOP, null);
if (mTaskWrapper.getRequestType() == ITaskWrapper.M3U8_VOD) {
writeConfig(false, getConfig().tempFile.length());
ALog.i(TAG, String.format("任务【%s】已停止", getFileName()));
@ -272,7 +272,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
/**
* 发送状态给状态处理器
*
* @param state {@link IThreadStateManager#STATE_STOP}..
* @param state {@link IBlockManager#STATE_STOP}..
* @param bundle 而外数据
*/
@Override
@ -282,8 +282,8 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
bundle = new Bundle();
}
msg.setData(bundle);
bundle.putString(IThreadStateManager.DATA_THREAD_NAME, getThreadName());
bundle.putLong(IThreadStateManager.DATA_THREAD_LOCATION, mRangeProgress);
bundle.putString(IBlockManager.DATA_THREAD_NAME, getThreadName());
bundle.putLong(IBlockManager.DATA_THREAD_LOCATION, mRangeProgress);
msg.what = state;
int reqType = getConfig().threadType;
if (reqType == SubThreadConfig.TYPE_M3U8_PEER) {
@ -299,10 +299,10 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
private void sendM3U8Info(int state, Message msg) {
Bundle bundle = msg.getData();
if (state != IThreadStateManager.STATE_UPDATE_PROGRESS) {
if (state != IBlockManager.STATE_UPDATE_PROGRESS) {
msg.obj = this;
}
if ((state == IThreadStateManager.STATE_COMPLETE || state == IThreadStateManager.STATE_FAIL)) {
if ((state == IBlockManager.STATE_COMPLETE || state == IBlockManager.STATE_FAIL)) {
bundle.putString(ISchedulers.DATA_M3U8_URL, getConfig().url);
bundle.putString(ISchedulers.DATA_M3U8_PEER_PATH, getConfig().tempFile.getPath());
bundle.putInt(ISchedulers.DATA_M3U8_PEER_INDEX, getConfig().peerIndex);
@ -315,7 +315,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
writeConfig(true, mRecord.endLocation);
// 进度发送不是实时的,发送完成任务前,需要更新一次进度
sendRunningState();
updateState(IThreadStateManager.STATE_COMPLETE, null);
updateState(IBlockManager.STATE_COMPLETE, null);
}
/**
@ -363,9 +363,9 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
b = new Bundle();
msg.setData(b);
}
b.putString(IThreadStateManager.DATA_THREAD_NAME, getThreadName());
b.putLong(IThreadStateManager.DATA_ADD_LEN, mRangeProgress - mLastRangeProgress);
msg.what = IThreadStateManager.STATE_RUNNING;
b.putString(IBlockManager.DATA_THREAD_NAME, getThreadName());
b.putLong(IBlockManager.DATA_ADD_LEN, mRangeProgress - mLastRangeProgress);
msg.what = IBlockManager.STATE_RUNNING;
msg.obj = mRangeProgress;
Thread loopThread = mStateHandler.getLooper().getThread();
@ -386,7 +386,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
@Override
public void cancel() {
isCancel = true;
updateState(IThreadStateManager.STATE_CANCEL, null);
updateState(IBlockManager.STATE_CANCEL, null);
ALog.d(TAG,
String.format("任务【%s】thread__%s__取消", getFileName(), mRecord.threadId));
}
@ -501,7 +501,7 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
} else if (blockFileLen < mRecord.blockLen) {
mRecord.startLocation = mRecord.endLocation - mRecord.blockLen + blockFileLen;
mRecord.isComplete = false;
updateState(IThreadStateManager.STATE_UPDATE_PROGRESS, null);
updateState(IBlockManager.STATE_UPDATE_PROGRESS, null);
ALog.i(TAG,
String.format("修正分块【%s】记录,开始位置:%s,结束位置:%s", temp.getName(), mRecord.startLocation,
mRecord.endLocation));
@ -519,11 +519,11 @@ public class ThreadTask implements IThreadTask, IThreadTaskObserver {
*/
private void sendFailMsg(AriaException e, boolean needRetry) {
Bundle b = new Bundle();
b.putBoolean(IThreadStateManager.DATA_RETRY, needRetry);
b.putBoolean(IBlockManager.DATA_RETRY, needRetry);
if (e != null) {
b.putSerializable(IThreadStateManager.DATA_ERROR_INFO, e);
b.putSerializable(IBlockManager.DATA_ERROR_INFO, e);
}
updateState(IThreadStateManager.STATE_FAIL, b);
updateState(IBlockManager.STATE_FAIL, b);
}
/**

@ -45,7 +45,7 @@ interface RecordDao {
"com.arialyy.aria.orm.dao.RecordDao.insert"
)
)
suspend fun insertTaskRecord(taskRecord: TaskRecord)
suspend fun insertTaskRecord(taskRecord: TaskRecord): Int
@Insert
suspend fun insertSubList(blockList: List<BlockRecord>)
@ -61,7 +61,10 @@ interface RecordDao {
@Transaction
suspend fun insert(taskRecord: TaskRecord) {
insertTaskRecord(taskRecord)
val tId = insertTaskRecord(taskRecord)
for (br in taskRecord.blockList) {
br.tId = tId
}
insertSubList(taskRecord.blockList)
}
}

@ -34,9 +34,8 @@ import androidx.room.PrimaryKey
)]
)
data class BlockRecord(
@PrimaryKey(autoGenerate = true) val bId: Int = 0,
val tId: Int,
val bId: Int = 0,
var tId: Int = 0,
/**
* 开始位置

@ -15,9 +15,11 @@
*/
package com.arialyy.aria.orm.entity
import android.net.Uri
import androidx.room.Entity
import androidx.room.Index
import androidx.room.PrimaryKey
import androidx.room.TypeConverters
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.core.inf.BaseEntity
import kotlinx.coroutines.Dispatchers
@ -27,6 +29,7 @@ import kotlinx.coroutines.launch
* Download Entity
*/
@Entity(indices = [Index(value = ["sourceUrl", "savePath"])])
@TypeConverters(FilePathConverter::class)
data class DEntity(
@PrimaryKey(autoGenerate = true) val did: Int = 0,
@ -39,7 +42,7 @@ data class DEntity(
/**
* file save path, it's uri
*/
val savePath: String,
val savePath: Uri,
/**
* extended Information
*/

@ -15,12 +15,12 @@
*/
package com.arialyy.aria.orm.entity
import android.net.Uri
import androidx.room.Entity
import androidx.room.Ignore
import androidx.room.Index
import androidx.room.PrimaryKey
import androidx.room.TypeConverters
import com.arialyy.aria.orm.DGUrlConverter
/**
* @Author laoyuyu
@ -28,7 +28,7 @@ import com.arialyy.aria.orm.DGUrlConverter
* @Date 4:32 PM 2023/1/16
**/
@Entity(indices = [Index(value = ["savePath"])])
@TypeConverters(DGUrlConverter::class)
@TypeConverters(DGUrlConverter::class, FilePathConverter::class)
data class DGEntity(
@PrimaryKey(autoGenerate = true) val dgId: Int = 0,
@ -40,7 +40,7 @@ data class DGEntity(
/**
* 保存路径
*/
val savePath: String,
val savePath: Uri,
/**
* 子任务url地址

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.orm
package com.arialyy.aria.orm.entity
import androidx.room.ProvidedTypeConverter
import androidx.room.TypeConverter

@ -1,5 +1,3 @@
package com.arialyy.aria.core.task
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
@ -15,13 +13,21 @@ package com.arialyy.aria.core.task
* See the License for the specific language governing permissions and
* limitations under the License.
*/
object MergeUtil {
package com.arialyy.aria.orm.entity
import android.net.Uri
import androidx.room.ProvidedTypeConverter
import androidx.room.TypeConverter
/**
* merge block file,if success,return true else return false
*/
fun merge(): Boolean {
@ProvidedTypeConverter
class FilePathConverter {
@TypeConverter
fun stringToUri(string: String?): Uri? {
return Uri.parse(string)
}
return false
@TypeConverter
fun uriToString(uri: Uri?): String? {
return uri?.toString()
}
}

@ -15,15 +15,18 @@
*/
package com.arialyy.aria.orm.entity
import android.net.Uri
import androidx.room.Entity
import androidx.room.Index
import androidx.room.PrimaryKey
import androidx.room.TypeConverters
import com.arialyy.aria.core.DuaContext
import com.arialyy.aria.orm.DuaDb
import timber.log.Timber
import java.util.TimeZone
@Entity(indices = [Index(value = ["sourceUrl", "savePath"])])
@TypeConverters(FilePathConverter::class)
data class MEntity(
@PrimaryKey(autoGenerate = true) val mId: Int = 0,
@ -36,7 +39,7 @@ data class MEntity(
/**
* file save path
*/
val savePath: String,
val savePath: Uri,
/**
* extended Information
*/

@ -15,21 +15,31 @@
*/
package com.arialyy.aria.orm.entity
import android.net.Uri
import androidx.room.Entity
import androidx.room.Ignore
import androidx.room.Index
import androidx.room.PrimaryKey
import androidx.room.TypeConverters
import com.arialyy.aria.core.task.ITask
@Entity(indices = [Index(value = ["taskKey"])])
@TypeConverters(FilePathConverter::class)
data class TaskRecord(
@PrimaryKey(autoGenerate = true) val tId: Int = 0,
val taskKey: String,
val filePath: String,
/**
* is uri
*/
val filePath: Uri,
/**
* [ITask.DOWNLOAD] ...
*/
val taskType: Int,
val fileLen: Long,
val blockNum: Int,
val blockSize: Long
) {
@Ignore
internal var blockList: MutableList<BlockRecord> = mutableListOf()
val blockList: MutableList<BlockRecord> = mutableListOf()
}

@ -15,11 +15,14 @@
*/
package com.arialyy.aria.orm.entity
import android.net.Uri
import androidx.room.Entity
import androidx.room.Index
import androidx.room.PrimaryKey
import androidx.room.TypeConverters
@Entity(indices = [Index(value = ["serverUrl", "filePath"])])
@TypeConverters(FilePathConverter::class)
data class UEntity(
@PrimaryKey(autoGenerate = true) val uId: Int = 0,
/**
@ -30,7 +33,7 @@ data class UEntity(
/**
* file path
*/
val filePath: String,
val filePath: Uri,
/**
* extended Information

@ -51,6 +51,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Scanner;
import java.util.regex.Pattern;
import timber.log.Timber;
/**
* 文件操作工具类
@ -319,11 +320,11 @@ public class FileUtil {
* 合并文件
*
* @param targetPath 目标文件
* @param subPaths 碎片文件路径
* @param blockList 分块列表
* @return {@code true} 合并成功{@code false}合并失败
*/
public static boolean mergeFile(String targetPath, List<String> subPaths) {
Log.d(TAG, "开始合并文件");
public static boolean mergeFile(String targetPath, List<File> blockList) {
Timber.d("开始合并文件");
File file = new File(targetPath);
FileOutputStream fos = null;
FileChannel foc = null;
@ -341,10 +342,9 @@ public class FileUtil {
foc = fos.getChannel();
List<FileInputStream> streams = new LinkedList<>();
long fileLen = 0;
for (String subPath : subPaths) {
File f = new File(subPath);
if (!f.exists()) {
ALog.d(TAG, String.format("合并文件失败,文件【%s】不存在", subPath));
for (File block : blockList) {
if (!block.exists()) {
ALog.d(TAG, String.format("合并文件失败,文件【%s】不存在", block.getPath()));
for (FileInputStream fis : streams) {
fis.close();
}
@ -352,10 +352,10 @@ public class FileUtil {
return false;
}
FileInputStream fis = new FileInputStream(subPath);
FileInputStream fis = new FileInputStream(block);
FileChannel fic = fis.getChannel();
foc.transferFrom(fic, fileLen, f.length());
fileLen += f.length();
foc.transferFrom(fic, fileLen, block.length());
fileLen += block.length();
fis.close();
}
ALog.d(TAG, String.format("合并文件耗时:%sms", (System.currentTimeMillis() - startTime)));

@ -22,7 +22,7 @@ import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.event.EventMsgUtil;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IDLoadListener;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.loader.AbsNormalLoader;
@ -149,7 +149,7 @@ final class SFtpDLoader extends AbsNormalLoader<DTaskWrapper> {
});
}
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

@ -20,7 +20,7 @@ import android.os.Looper;
import com.arialyy.aria.core.common.AbsEntity;
import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.event.EventMsgUtil;
import com.arialyy.aria.core.inf.IThreadStateManager;
import com.arialyy.aria.core.inf.IBlockManager;
import com.arialyy.aria.core.listener.IDLoadListener;
import com.arialyy.aria.core.listener.IEventListener;
import com.arialyy.aria.core.loader.AbsNormalLoader;
@ -148,7 +148,7 @@ final class SFtpULoader extends AbsNormalLoader<UTaskWrapper> {
});
}
@Override public void addComponent(IThreadStateManager threadState) {
@Override public void addComponent(IBlockManager threadState) {
mStateManager = threadState;
}

Loading…
Cancel
Save