优化任务线程

pull/330/head
laoyuyu 6 years ago
parent a63222a220
commit 6d5398de40
  1. 49
      Aria/src/main/java/com/arialyy/aria/core/common/AbsFileer.java
  2. 7
      Aria/src/main/java/com/arialyy/aria/core/common/AbsThreadTask.java
  3. 78
      Aria/src/main/java/com/arialyy/aria/core/common/ThreadTaskManager.java
  4. 15
      Aria/src/main/java/com/arialyy/aria/core/download/downloader/FtpThreadTask.java
  5. 11
      Aria/src/main/java/com/arialyy/aria/core/download/downloader/HttpThreadTask.java
  6. 112
      Aria/src/main/java/com/arialyy/aria/core/manager/ThreadTaskManager.java
  7. 11
      Aria/src/main/java/com/arialyy/aria/core/upload/uploader/FtpThreadTask.java
  8. 7
      Aria/src/main/java/com/arialyy/aria/core/upload/uploader/HttpThreadTask.java
  9. 6
      app/src/main/java/com/arialyy/simple/download/SingleTaskActivity.java

@ -22,6 +22,7 @@ import com.arialyy.aria.core.download.DownloadTaskEntity;
import com.arialyy.aria.core.inf.AbsNormalEntity;
import com.arialyy.aria.core.inf.AbsTaskEntity;
import com.arialyy.aria.core.inf.IEventListener;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.orm.DbEntity;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
@ -36,8 +37,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by AriaL on 2017/7/1.
@ -63,7 +62,6 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
protected Context mContext;
protected File mTempFile; //文件
protected StateConstance mConstance;
private ExecutorService mFixedThreadPool;
//总线程数
protected int mTotalThreadNum;
//启动线程数
@ -121,9 +119,6 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
mCompleteThreadNum = 0;
mTotalThreadNum = 0;
mStartThreadNum = 0;
if (mFixedThreadPool != null) {
mFixedThreadPool.shutdown();
}
if (mTask != null && !mTask.isEmpty()) {
for (int i = 0; i < mTask.size(); i++) {
AbsThreadTask task = mTask.get(i);
@ -258,15 +253,13 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
closeTimer();
mConstance.isRunning = false;
mConstance.isCancel = true;
if (mFixedThreadPool != null) {
mFixedThreadPool.shutdown();
}
for (int i = 0; i < mStartThreadNum; i++) {
AbsThreadTask task = mTask.get(i);
if (task != null) {
task.cancel();
}
}
ThreadTaskManager.getInstance().stopTaskThread(mTaskEntity.getKey());
}
public void stop() {
@ -274,15 +267,13 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
mConstance.isRunning = false;
mConstance.isStop = true;
if (mConstance.isComplete()) return;
if (mFixedThreadPool != null) {
mFixedThreadPool.shutdown();
}
for (int i = 0; i < mStartThreadNum; i++) {
AbsThreadTask task = mTask.get(i);
if (task != null && !task.isThreadComplete()) {
task.stop();
}
}
ThreadTaskManager.getInstance().stopTaskThread(mTaskEntity.getKey());
}
/**
@ -636,13 +627,9 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
} else {
mListener.onStart(mConstance.CURRENT_LOCATION);
}
mFixedThreadPool = Executors.newFixedThreadPool(recordL.length);
for (int l : recordL) {
if (l == -1) continue;
Runnable task = mTask.get(l);
if (task != null) {
mFixedThreadPool.execute(task);
}
ThreadTaskManager.getInstance().startThread(mTaskEntity.getKey(), mTask.get(l));
}
}
@ -652,31 +639,6 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
public void retryTask() {
ALog.w(TAG, String.format("任务【%s】开始重试", mEntity.getFileName()));
startFlow();
//if (isBreak()) {
// return;
//}
//if (mTask == null || mTask.size() == 0) {
// ALog.w(TAG, "没有线程任务");
// return;
//}
//Set<Integer> keys = mTask.keySet();
//for (Integer key : keys) {
// AbsThreadTask task = mTask.get(key);
// if (task != null && !task.isThreadComplete()) {
//
// task.getConfig().START_LOCATION = task.getConfig().THREAD_RECORD.startLocation;
// mConstance.resetState();
// startTimer();
// ALog.d(TAG, String.format("任务【%s】开始重试,线程__%s__【开始位置:%s,结束位置:%s】", mEntity.getFileName(),
// key, task.getConfig().START_LOCATION, task.getConfig().END_LOCATION));
// if (!mFixedThreadPool.isShutdown()) {
// mFixedThreadPool.execute(task);
// } else {
// ALog.w(TAG, "线程池已关闭");
// mListener.onFail(true);
// }
// }
//}
}
/**
@ -707,8 +669,7 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_ENTITY exte
AbsThreadTask task = selectThreadTask(config);
if (task == null) return;
mTask.put(0, task);
mFixedThreadPool = Executors.newFixedThreadPool(1);
mFixedThreadPool.execute(task);
ThreadTaskManager.getInstance().startThread(mTaskEntity.getKey(), task);
mListener.onStart(0);
}

@ -20,6 +20,7 @@ import com.arialyy.aria.core.AriaManager;
import com.arialyy.aria.core.inf.AbsNormalEntity;
import com.arialyy.aria.core.inf.AbsTaskEntity;
import com.arialyy.aria.core.inf.IEventListener;
import com.arialyy.aria.core.manager.ThreadTaskManager;
import com.arialyy.aria.core.upload.UploadEntity;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.ErrorHelp;
@ -30,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -38,7 +40,7 @@ import java.util.concurrent.Executors;
* 任务线程
*/
public abstract class AbsThreadTask<ENTITY extends AbsNormalEntity, TASK_ENTITY extends AbsTaskEntity<ENTITY>>
implements Runnable {
implements Callable<TASK_ENTITY> {
/**
* 线程重试次数
*/
@ -352,7 +354,8 @@ public abstract class AbsThreadTask<ENTITY extends AbsNormalEntity, TASK_ENTITY
}
mFailTimes++;
handleRetryRecord();
AbsThreadTask.this.run();
ThreadTaskManager.getInstance().retryThread(AbsThreadTask.this);
//AbsThreadTask.this.run();
}
}, 3000);
} else {

@ -1,78 +0,0 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.core.common;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 线程任务管理器
*/
class ThreadTaskManager {
private static volatile ThreadTaskManager INSTANCE = null;
private Map<String, List<AbsThreadTask>> mThreadTasks = new ConcurrentHashMap<>();
private ThreadTaskManager() {
}
public static ThreadTaskManager getInstance() {
if (INSTANCE == null) {
synchronized (ThreadTaskManager.class) {
INSTANCE = new ThreadTaskManager();
}
}
return INSTANCE;
}
/**
* 添加单条线程记录
*
* @param key 任务对应的key
* @param threadTask 线程任务
*/
public void addTask(String key, AbsThreadTask threadTask) {
if (mThreadTasks.get(key) == null) {
mThreadTasks.put(key, new ArrayList<AbsThreadTask>());
}
mThreadTasks.get(key).add(threadTask);
}
/**
* 删除对应的任务的线程记录
*
* @param key 任务对应的key
*/
public void removeTask(String key) {
for (Iterator<Map.Entry<String, List<AbsThreadTask>>> iter = mThreadTasks.entrySet().iterator();
iter.hasNext(); ) {
Map.Entry<String, List<AbsThreadTask>> entry = iter.next();
if (key.equals(entry.getKey())) {
List<AbsThreadTask> list = mThreadTasks.get(key);
if (list != null && !list.isEmpty()) {
list.clear();
}
iter.remove();
}
}
}
}

@ -53,10 +53,10 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
isBlock = STATE.TASK_RECORD.isBlock;
}
@Override public void run() {
@Override public DownloadTaskEntity call() throws Exception {
if (mConfig.THREAD_RECORD.isComplete) {
handleComplete();
return;
return mTaskEntity;
}
mChildCurrentLocation = mConfig.START_LOCATION;
FTPClient client = null;
@ -69,7 +69,7 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
client = createClient();
if (client == null) {
fail(mChildCurrentLocation, "ftp client 创建失败", null);
return;
return mTaskEntity;
}
if (mConfig.START_LOCATION > 0) {
client.setRestartOffset(mConfig.START_LOCATION);
@ -81,7 +81,7 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
String.format("获取文件信息错误,错误码为:%s,msg:%s", reply, client.getReplyString()),
null);
client.disconnect();
return;
return mTaskEntity;
}
String remotePath =
new String(mTaskEntity.getUrlEntity().remotePath.getBytes(charSet), SERVER_CHARSET);
@ -93,7 +93,7 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
String.format("获取流失败,错误码为:%s,msg:%s", reply, client.getReplyString()),
null);
client.disconnect();
return;
return mTaskEntity;
}
if (isOpenDynamicFile) {
@ -118,6 +118,7 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
e.printStackTrace();
}
}
return mTaskEntity;
}
/**
@ -164,7 +165,7 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
foc = fos.getChannel();
fic = Channels.newChannel(is);
ByteBuffer bf = ByteBuffer.allocate(mBufSize);
while ((len = fic.read(bf)) != -1) {
while (!Thread.currentThread().isInterrupted() && (len = fic.read(bf)) != -1) {
if (isBreak()) {
break;
}
@ -215,7 +216,7 @@ class FtpThreadTask extends AbsFtpThreadTask<DownloadEntity, DownloadTaskEntity>
file.seek(mConfig.START_LOCATION);
byte[] buffer = new byte[mBufSize];
int len;
while ((len = is.read(buffer)) != -1) {
while (!Thread.currentThread().isInterrupted() && (len = is.read(buffer)) != -1) {
if (isBreak()) {
break;
}

@ -33,6 +33,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
@ -56,10 +57,10 @@ final class HttpThreadTask extends AbsThreadTask<DownloadEntity, DownloadTaskEnt
isBlock = STATE.TASK_RECORD.isBlock;
}
@Override public void run() {
@Override public DownloadTaskEntity call(){
if (getThreadRecord().isComplete) {
handleComplete();
return;
return mTaskEntity;
}
HttpURLConnection conn = null;
BufferedInputStream is = null;
@ -118,6 +119,7 @@ final class HttpThreadTask extends AbsThreadTask<DownloadEntity, DownloadTaskEnt
e.printStackTrace();
}
}
return mTaskEntity;
}
/**
@ -133,7 +135,8 @@ final class HttpThreadTask extends AbsThreadTask<DownloadEntity, DownloadTaskEnt
foc = fos.getChannel();
fic = Channels.newChannel(is);
ByteBuffer bf = ByteBuffer.allocate(mBufSize);
while ((len = fic.read(bf)) != -1) {
//如果要通过 Future 的 cancel 方法取消正在运行的任务,那么该任务必定是可以 对线程中断做出响应 的任务。
while (!Thread.currentThread().isInterrupted() && (len = fic.read(bf)) != -1) {
if (isBreak()) {
break;
}
@ -192,7 +195,7 @@ final class HttpThreadTask extends AbsThreadTask<DownloadEntity, DownloadTaskEnt
throws IOException {
byte[] buffer = new byte[mBufSize];
int len;
while ((len = is.read(buffer)) != -1) {
while (!Thread.currentThread().isInterrupted() && (len = is.read(buffer)) != -1) {
if (isBreak()) {
break;
}

@ -1,4 +1,116 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.core.manager;
import com.arialyy.aria.core.common.AbsThreadTask;
import com.arialyy.aria.core.inf.AbsTaskEntity;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.CommonUtil;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 线程管理器
*/
public class ThreadTaskManager {
private static volatile ThreadTaskManager INSTANCE = null;
private static final Object LOCK = new Object();
private final String TAG = "ThreadTaskManager";
private ExecutorService mExePool;
private Map<String, Set<Future>> mThreadTasks = new HashMap<>();
public static ThreadTaskManager getInstance() {
if (INSTANCE == null) {
synchronized (LOCK) {
INSTANCE = new ThreadTaskManager();
}
}
return INSTANCE;
}
private ThreadTaskManager() {
mExePool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
/**
* 启动线程任务
*
* @param key 任务对应的key{@link AbsTaskEntity#getKey()}
* @param threadTask 线程任务{@link AbsThreadTask}
*/
public void startThread(String key, AbsThreadTask threadTask) {
if (mExePool.isShutdown()) {
ALog.e(TAG, "线程池已经关闭");
return;
}
key = getKey(key);
Set<Future> temp = mThreadTasks.get(key);
if (temp == null) {
temp = new HashSet<>();
mThreadTasks.put(key, temp);
}
temp.add(mExePool.submit(threadTask));
}
/**
* 停止任务的所有线程
*
* @param key 任务对应的key{@link AbsTaskEntity#getKey()}
*/
public void stopTaskThread(String key) {
if (mExePool.isShutdown()) {
ALog.e(TAG, "线程池已经关闭");
return;
}
Set<Future> temp = mThreadTasks.get(getKey(key));
for (Future future : temp) {
if (future.isDone() || future.isCancelled()) {
continue;
}
future.cancel(true);
}
mThreadTasks.remove(key);
}
/**
* 重试线程任务
*
* @param task 线程任务
*/
public void retryThread(AbsThreadTask task) {
if (mExePool.isShutdown()) {
ALog.e(TAG, "线程池已经关闭");
return;
}
mExePool.submit(task);
}
/**
* map中的key
*
* @param key 任务的key{@link AbsTaskEntity#getKey()}
* @return 转换后的map中的key
*/
private String getKey(String key) {
return CommonUtil.getStrMd5(key);
}
}

@ -50,7 +50,7 @@ class FtpThreadTask extends AbsFtpThreadTask<UploadEntity, UploadTaskEntity> {
return mAridManager.getUploadConfig().getMaxSpeed();
}
@Override public void run() {
@Override public UploadTaskEntity call() throws Exception {
//当前子线程的下载位置
mChildCurrentLocation = mConfig.START_LOCATION;
FTPClient client = null;
@ -60,7 +60,9 @@ class FtpThreadTask extends AbsFtpThreadTask<UploadEntity, UploadTaskEntity> {
String.format("任务【%s】线程__%s__开始上传【开始位置 : %s,结束位置:%s】", mConfig.TEMP_FILE.getName(),
mConfig.THREAD_ID, mConfig.START_LOCATION, mConfig.END_LOCATION));
client = createClient();
if (client == null) return;
if (client == null) {
return mTaskEntity;
}
initPath();
client.makeDirectory(dir);
client.changeWorkingDirectory(dir);
@ -69,7 +71,7 @@ class FtpThreadTask extends AbsFtpThreadTask<UploadEntity, UploadTaskEntity> {
if (!FTPReply.isPositivePreliminary(reply) && reply != FTPReply.FILE_ACTION_OK) {
fail(mChildCurrentLocation, "上传失败,错误码为:" + reply + ",msg:" + client.getReplyString(), null);
client.disconnect();
return;
return mTaskEntity;
}
file = new BufferedRandomAccessFile(mConfig.TEMP_FILE, "rwd", mBufSize);
@ -79,7 +81,7 @@ class FtpThreadTask extends AbsFtpThreadTask<UploadEntity, UploadTaskEntity> {
}
upload(client, file);
if (isBreak()) {
return;
return mTaskEntity;
}
ALog.i(TAG,
String.format("任务【%s】线程__%s__上传完毕", mConfig.TEMP_FILE.getName(), mConfig.THREAD_ID));
@ -110,6 +112,7 @@ class FtpThreadTask extends AbsFtpThreadTask<UploadEntity, UploadTaskEntity> {
e.printStackTrace();
}
}
return mTaskEntity;
}
private void initPath() throws UnsupportedEncodingException {

@ -57,12 +57,12 @@ class HttpThreadTask extends AbsThreadTask<UploadEntity, UploadTaskEntity> {
isNotNetRetry = mAridManager.getUploadConfig().isNotNetRetry();
}
@Override public void run() {
@Override public UploadTaskEntity call() throws Exception {
File uploadFile = new File(mEntity.getFilePath());
if (!uploadFile.exists()) {
ALog.e(TAG, String.format("【%s】,文件不存在。", mEntity.getFilePath()));
fail();
return;
return mTaskEntity;
}
mListener.onPre();
URL url;
@ -103,6 +103,7 @@ class HttpThreadTask extends AbsThreadTask<UploadEntity, UploadTaskEntity> {
e.printStackTrace();
fail();
}
return mTaskEntity;
}
private void fail() {
@ -199,7 +200,7 @@ class HttpThreadTask extends AbsThreadTask<UploadEntity, UploadTaskEntity> {
if (status == HttpURLConnection.HTTP_OK) {
BufferedReader reader = new BufferedReader(new InputStreamReader(mHttpConn.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
while (!Thread.currentThread().isInterrupted() && (line = reader.readLine()) != null) {
response.append(line);
}
reader.close();

@ -222,9 +222,9 @@ public class SingleTaskActivity extends BaseActivity<ActivitySingleBinding> {
startD();
break;
case R.id.stop:
//Aria.download(this).load(DOWNLOAD_URL).stop();
startActivity(new Intent(this, SingleTaskActivity.class));
Aria.download(this).unRegister();
Aria.download(this).load(DOWNLOAD_URL).stop();
//startActivity(new Intent(this, SingleTaskActivity.class));
//Aria.download(this).unRegister();
//Aria.download(this).load(DOWNLOAD_URL).removeRecord();
break;
case R.id.cancel:

Loading…
Cancel
Save