组合任务子任务调度器实现

v3.6.6
laoyuyu 6 years ago
parent 1a83617045
commit dbddf3b4e2
  1. 8
      Aria/src/main/java/com/arialyy/aria/core/common/AbsFileer.java
  2. 2
      Aria/src/main/java/com/arialyy/aria/core/download/DownloadGroupListener.java
  3. 6
      Aria/src/main/java/com/arialyy/aria/core/download/DownloadGroupTask.java
  4. 7
      Aria/src/main/java/com/arialyy/aria/core/download/downloader/Downloader.java
  5. 4
      Aria/src/main/java/com/arialyy/aria/core/download/downloader/HttpFileInfoThread.java
  6. 3
      Aria/src/main/java/com/arialyy/aria/core/download/group/AbsGroupUtil.java
  7. 3
      Aria/src/main/java/com/arialyy/aria/core/download/group/DownloadGroupUtil.java
  8. 5
      Aria/src/main/java/com/arialyy/aria/core/download/group/FtpDirDownloadUtil.java
  9. 2
      Aria/src/main/java/com/arialyy/aria/core/download/group/FtpDirInfoThread.java
  10. 2
      Aria/src/main/java/com/arialyy/aria/core/download/group/IDownloadGroupListener.java
  11. 64
      Aria/src/main/java/com/arialyy/aria/core/download/group/ISubQueue.java
  12. 115
      Aria/src/main/java/com/arialyy/aria/core/download/group/SimpleSchedulers.java
  13. 145
      Aria/src/main/java/com/arialyy/aria/core/download/group/SimpleSubQueue.java
  14. 2
      Aria/src/main/java/com/arialyy/aria/core/inf/AbsGroupTask.java
  15. 14
      Aria/src/main/java/com/arialyy/aria/core/scheduler/AbsSchedulers.java
  16. 18
      Aria/src/main/java/com/arialyy/aria/core/scheduler/ISchedulers.java

@ -92,6 +92,14 @@ public abstract class AbsFileer<ENTITY extends AbsNormalEntity, TASK_WRAPPER ext
mTaskWrapper.setNewTask(newTask); mTaskWrapper.setNewTask(newTask);
} }
public String getKey() {
return mTaskWrapper.getKey();
}
public ENTITY getEntity() {
return mEntity;
}
/** /**
* 设置最大下载/上传速度 * 设置最大下载/上传速度
* *

@ -17,7 +17,7 @@ package com.arialyy.aria.core.download;
import android.os.Handler; import android.os.Handler;
import com.arialyy.aria.core.common.BaseListener; import com.arialyy.aria.core.common.BaseListener;
import com.arialyy.aria.core.download.downloader.IDownloadGroupListener; import com.arialyy.aria.core.download.group.IDownloadGroupListener;
import com.arialyy.aria.core.inf.GroupSendParams; import com.arialyy.aria.core.inf.GroupSendParams;
import com.arialyy.aria.core.inf.IEntity; import com.arialyy.aria.core.inf.IEntity;
import com.arialyy.aria.core.scheduler.ISchedulers; import com.arialyy.aria.core.scheduler.ISchedulers;

@ -19,9 +19,9 @@ import android.os.Handler;
import android.os.Looper; import android.os.Looper;
import android.text.TextUtils; import android.text.TextUtils;
import com.arialyy.aria.core.AriaManager; import com.arialyy.aria.core.AriaManager;
import com.arialyy.aria.core.download.downloader.DownloadGroupUtil; import com.arialyy.aria.core.download.group.DownloadGroupUtil;
import com.arialyy.aria.core.download.downloader.FtpDirDownloadUtil; import com.arialyy.aria.core.download.group.FtpDirDownloadUtil;
import com.arialyy.aria.core.download.downloader.IDownloadGroupListener; import com.arialyy.aria.core.download.group.IDownloadGroupListener;
import com.arialyy.aria.core.inf.AbsGroupTask; import com.arialyy.aria.core.inf.AbsGroupTask;
import com.arialyy.aria.core.inf.AbsTaskWrapper; import com.arialyy.aria.core.inf.AbsTaskWrapper;
import com.arialyy.aria.core.scheduler.ISchedulers; import com.arialyy.aria.core.scheduler.ISchedulers;

@ -30,13 +30,12 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
/** /**
* Created by AriaL on 2017/7/1. * Created by AriaL on 2017/7/1. 文件下载器
* 文件下载器
*/ */
class Downloader extends AbsFileer<DownloadEntity, DTaskWrapper> { public class Downloader extends AbsFileer<DownloadEntity, DTaskWrapper> {
private String TAG = "Downloader"; private String TAG = "Downloader";
Downloader(IDownloadListener listener, DTaskWrapper taskEntity) { public Downloader(IDownloadListener listener, DTaskWrapper taskEntity) {
super(listener, taskEntity); super(listener, taskEntity);
mTempFile = new File(mEntity.getDownloadPath()); mTempFile = new File(mEntity.getDownloadPath());
AriaManager manager = AriaManager.getInstance(AriaManager.APP); AriaManager manager = AriaManager.getInstance(AriaManager.APP);

@ -48,7 +48,7 @@ import java.util.Set;
/** /**
* 下载文件信息获取 * 下载文件信息获取
*/ */
class HttpFileInfoThread implements Runnable { public class HttpFileInfoThread implements Runnable {
private final String TAG = "HttpFileInfoThread"; private final String TAG = "HttpFileInfoThread";
private DownloadEntity mEntity; private DownloadEntity mEntity;
private DTaskWrapper mTaskWrapper; private DTaskWrapper mTaskWrapper;
@ -56,7 +56,7 @@ class HttpFileInfoThread implements Runnable {
private OnFileInfoCallback onFileInfoCallback; private OnFileInfoCallback onFileInfoCallback;
private HttpTaskDelegate mTaskDelegate; private HttpTaskDelegate mTaskDelegate;
HttpFileInfoThread(DTaskWrapper taskWrapper, OnFileInfoCallback callback) { public HttpFileInfoThread(DTaskWrapper taskWrapper, OnFileInfoCallback callback) {
this.mTaskWrapper = taskWrapper; this.mTaskWrapper = taskWrapper;
mEntity = taskWrapper.getEntity(); mEntity = taskWrapper.getEntity();
mConnectTimeOut = mConnectTimeOut =

@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.arialyy.aria.core.download.downloader; package com.arialyy.aria.core.download.group;
import com.arialyy.aria.core.AriaManager; import com.arialyy.aria.core.AriaManager;
import com.arialyy.aria.core.common.IUtil; import com.arialyy.aria.core.common.IUtil;
import com.arialyy.aria.core.download.DTaskWrapper; import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.download.DownloadEntity; import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.download.DGTaskWrapper; import com.arialyy.aria.core.download.DGTaskWrapper;
import com.arialyy.aria.core.download.downloader.Downloader;
import com.arialyy.aria.core.inf.IDownloadListener; import com.arialyy.aria.core.inf.IDownloadListener;
import com.arialyy.aria.core.inf.IEntity; import com.arialyy.aria.core.inf.IEntity;
import com.arialyy.aria.exception.BaseException; import com.arialyy.aria.exception.BaseException;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.arialyy.aria.core.download.downloader; package com.arialyy.aria.core.download.group;
import android.util.SparseArray; import android.util.SparseArray;
import com.arialyy.aria.core.common.CompleteInfo; import com.arialyy.aria.core.common.CompleteInfo;
@ -21,6 +21,7 @@ import com.arialyy.aria.core.common.IUtil;
import com.arialyy.aria.core.common.OnFileInfoCallback; import com.arialyy.aria.core.common.OnFileInfoCallback;
import com.arialyy.aria.core.download.DGTaskWrapper; import com.arialyy.aria.core.download.DGTaskWrapper;
import com.arialyy.aria.core.download.DTaskWrapper; import com.arialyy.aria.core.download.DTaskWrapper;
import com.arialyy.aria.core.download.downloader.HttpFileInfoThread;
import com.arialyy.aria.core.inf.IEntity; import com.arialyy.aria.core.inf.IEntity;
import com.arialyy.aria.exception.BaseException; import com.arialyy.aria.exception.BaseException;
import com.arialyy.aria.exception.TaskException; import com.arialyy.aria.exception.TaskException;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.arialyy.aria.core.download.downloader; package com.arialyy.aria.core.download.group;
import com.arialyy.aria.core.common.CompleteInfo; import com.arialyy.aria.core.common.CompleteInfo;
import com.arialyy.aria.core.common.OnFileInfoCallback; import com.arialyy.aria.core.common.OnFileInfoCallback;
@ -23,8 +23,7 @@ import com.arialyy.aria.exception.BaseException;
import java.util.Set; import java.util.Set;
/** /**
* Created by Aria.Lao on 2017/7/27. * Created by Aria.Lao on 2017/7/27. ftp文件夹下载工具
* ftp文件夹下载工具
*/ */
public class FtpDirDownloadUtil extends AbsGroupUtil { public class FtpDirDownloadUtil extends AbsGroupUtil {
private String TAG = "FtpDirDownloadUtil"; private String TAG = "FtpDirDownloadUtil";

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.arialyy.aria.core.download.downloader; package com.arialyy.aria.core.download.group;
import aria.apache.commons.net.ftp.FTPFile; import aria.apache.commons.net.ftp.FTPFile;
import com.arialyy.aria.core.FtpUrlEntity; import com.arialyy.aria.core.FtpUrlEntity;

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.arialyy.aria.core.download.downloader; package com.arialyy.aria.core.download.group;
import com.arialyy.aria.core.download.DownloadEntity; import com.arialyy.aria.core.download.DownloadEntity;
import com.arialyy.aria.core.inf.IDownloadListener; import com.arialyy.aria.core.inf.IDownloadListener;

@ -0,0 +1,64 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.core.download.group;
import com.arialyy.aria.core.common.AbsFileer;
/**
* 组合任务子任务队列
*
* @param <Fileer> {@link AbsFileer}下载器
*/
interface ISubQueue<Fileer extends AbsFileer> {
/**
* 添加任务
*/
void addTask(Fileer fileer);
/**
* 开始任务
*/
void startTask(Fileer fileer);
/**
* 停止任务
*/
void stopTask(Fileer fileer);
/**
* 修改最大任务数
*
* @param num 任务数不能小于1
*/
void modifyMaxExecNum(int num);
/**
* 从执行队列中移除任务一般用于任务完成的情况
*/
void removeTaskFromExecQ(Fileer fileer);
/**
* 删除任务
*/
void removeTask(Fileer fileer);
/**
* 获取下一个任务
*/
Fileer getNextTask();
}

@ -0,0 +1,115 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.core.download.group;
import android.os.CountDownTimer;
import android.os.Message;
import com.arialyy.aria.core.AriaManager;
import com.arialyy.aria.core.config.Configuration;
import com.arialyy.aria.core.download.DownloadTask;
import com.arialyy.aria.core.download.downloader.Downloader;
import com.arialyy.aria.core.inf.AbsEntity;
import com.arialyy.aria.core.scheduler.ISchedulers;
import com.arialyy.aria.util.ALog;
import com.arialyy.aria.util.NetUtils;
/**
* 组合任务子任务调度器用于调度任务的开始停止失败完成等情况
* 该调度器生命周期和{@link AbsGroupUtil}生命周期一致
*/
public class SimpleSchedulers implements ISchedulers<DownloadTask> {
private static final String TAG = "SimpleSchedulers";
private SimpleSubQueue mQueue = SimpleSubQueue.newInstance();
private SimpleSchedulers() {
}
public static SimpleSchedulers newInstance() {
return new SimpleSchedulers();
}
@Override public boolean handleMessage(Message msg) {
Downloader loader = (Downloader) msg.obj;
switch (msg.what) {
case ADD:
mQueue.addTask(loader);
break;
case START:
mQueue.startTask(loader);
break;
case STOP:
mQueue.stopTask(loader);
startNext();
break;
case COMPLETE:
mQueue.removeTaskFromExecQ(loader);
startNext();
case FAIL:
break;
}
return true;
}
/**
* 如果有等待中的任务则启动下一任务
*/
private void startNext() {
Downloader next = mQueue.getNextTask();
if (next != null) {
mQueue.startTask(next);
} else {
ALog.i(TAG, "没有下一任务");
}
}
/**
* 处理失败的任务
*/
private void handleFail(final Downloader loader) {
Configuration config = Configuration.getInstance();
long interval = config.downloadCfg.getReTryInterval();
int num = config.downloadCfg.getReTryNum();
boolean isNotNetRetry = config.appCfg.isNotNetRetry();
final int reTryNum = num;
if ((!NetUtils.isConnected(AriaManager.APP) && !isNotNetRetry)
|| loader.getEntity().getFailNum() > reTryNum) {
startNext();
return;
}
CountDownTimer timer = new CountDownTimer(interval, 1000) {
@Override public void onTick(long millisUntilFinished) {
}
@Override public void onFinish() {
AbsEntity entity = loader.getEntity();
if (entity.getFailNum() <= reTryNum) {
ALog.d(TAG, String.format("任务【%s】开始重试", loader.getEntity().getFileName()));
loader.retryTask();
} else {
startNext();
}
}
};
timer.start();
}
}

@ -0,0 +1,145 @@
/*
* Copyright (C) 2016 AriaLyy(https://github.com/AriaLyy/Aria)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arialyy.aria.core.download.group;
import com.arialyy.aria.core.config.Configuration;
import com.arialyy.aria.core.download.downloader.Downloader;
import com.arialyy.aria.util.ALog;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 组合任务队列该队列生命周期和{@link AbsGroupUtil}生命周期一致
*/
class SimpleSubQueue implements ISubQueue<Downloader> {
private static final String TAG = "SimpleSubQueue";
/**
* 缓存下载器
*/
private Map<String, Downloader> mCache = new LinkedHashMap<>();
/**
* 执行中的下载器
*/
private Map<String, Downloader> mExec = new LinkedHashMap<>();
/**
* 最大执行任务数
*/
private int mExecSize;
private SimpleSubQueue() {
mExecSize = Configuration.getInstance().dGroupCfg.getSubMaxTaskNum();
}
static SimpleSubQueue newInstance() {
return new SimpleSubQueue();
}
@Override public void addTask(Downloader fileer) {
mCache.put(fileer.getKey(), fileer);
}
@Override public void startTask(Downloader fileer) {
if (mExec.size() < mExecSize) {
mCache.remove(fileer.getKey());
mExec.put(fileer.getKey(), fileer);
fileer.start();
} else {
ALog.d(TAG, String.format("执行队列已满,任务见缓冲到缓存器中,key: %s", fileer.getKey()));
addTask(fileer);
}
}
@Override public void stopTask(Downloader fileer) {
fileer.stop();
mExec.remove(fileer.getKey());
}
@Override public void modifyMaxExecNum(int num) {
if (num < 1) {
ALog.e(TAG, String.format("修改组合任务子任务队列数失败,num: %s", num));
return;
}
if (num == mExecSize) {
ALog.i(TAG, String.format("忽略此次修改,oldSize: %s, num: %s", mExecSize, num));
return;
}
int oldSize = mExecSize;
mExecSize = num;
int diff = Math.abs(oldSize - num);
if (oldSize < num) { // 处理队列变小的情况,该情况下将停止队尾任务,并将这些任务添加到缓存队列中
if (mExec.size() > num) {
Set<String> keys = mExec.keySet();
List<Downloader> caches = new ArrayList<>();
int i = 0;
for (String key : keys) {
if (i > num) {
caches.add(mExec.get(key));
}
i++;
}
Collection<Downloader> temp = mCache.values();
mCache.clear();
ALog.d(TAG, String.format("测试, map size: %s", mCache.size()));
for (Downloader cache : caches) {
addTask(cache);
}
for (Downloader t : temp) {
addTask(t);
}
}
} else { // 处理队列变大的情况,该情况下将增加任务
if (mExec.size() < num) {
for (int i = 0; i < diff; i++) {
Downloader next = getNextTask();
if (next != null) {
startTask(next);
} else {
ALog.d(TAG, "子任务中没有缓存任务");
}
}
}
}
}
@Override public void removeTaskFromExecQ(Downloader fileer) {
if (mExec.containsKey(fileer.getKey())) {
if (fileer.isRunning()) {
fileer.stop();
}
mExec.remove(fileer.getKey());
}
}
@Override public void removeTask(Downloader fileer) {
removeTaskFromExecQ(fileer);
mCache.remove(fileer.getKey());
}
@Override public Downloader getNextTask() {
Iterator<String> keys = mCache.keySet().iterator();
if (keys.hasNext()) {
return mCache.get(keys.next());
}
return null;
}
}

@ -15,7 +15,7 @@
*/ */
package com.arialyy.aria.core.inf; package com.arialyy.aria.core.inf;
import com.arialyy.aria.core.download.downloader.AbsGroupUtil; import com.arialyy.aria.core.download.group.AbsGroupUtil;
/** /**
* Created by AriaL on 2017/6/29. * Created by AriaL on 2017/6/29.

@ -63,7 +63,12 @@ abstract class AbsSchedulers<TASK_ENTITY extends AbsTaskWrapper, TASK extends Ab
manager = AriaManager.getInstance(AriaManager.APP); manager = AriaManager.getInstance(AriaManager.APP);
} }
@Override public void register(Object obj) { /**
* 将当前类注册到Aria
*
* @param obj 观察者类
*/
public void register(Object obj) {
String targetName = obj.getClass().getName(); String targetName = obj.getClass().getName();
AbsSchedulerListener<TASK, AbsNormalEntity> listener = mObservers.get(getKey(obj)); AbsSchedulerListener<TASK, AbsNormalEntity> listener = mObservers.get(getKey(obj));
if (listener == null) { if (listener == null) {
@ -77,7 +82,12 @@ abstract class AbsSchedulers<TASK_ENTITY extends AbsTaskWrapper, TASK extends Ab
} }
} }
@Override public void unRegister(Object obj) { /**
* 移除注册
*
* @param obj 观察者类
*/
public void unRegister(Object obj) {
if (!mObservers.containsKey(getKey(obj))) { if (!mObservers.containsKey(getKey(obj))) {
return; return;
} }

@ -113,6 +113,10 @@ public interface ISchedulers<Task extends AbsTask> extends Handler.Callback {
* 等待 * 等待
*/ */
int WAIT = 10; int WAIT = 10;
/**
* 添加任务
*/
int ADD = 11;
/** /**
* 组合任务子任务预处理 * 组合任务子任务预处理
@ -148,18 +152,4 @@ public interface ISchedulers<Task extends AbsTask> extends Handler.Callback {
* 组合任务子任务完成 * 组合任务子任务完成
*/ */
int SUB_COMPLETE = 0xa7; int SUB_COMPLETE = 0xa7;
/**
* 将当前类注册到Aria
*
* @param obj 观察者类
*/
void register(Object obj);
/**
* 移除注册
*
* @param obj 观察者类
*/
void unRegister(Object obj);
} }
Loading…
Cancel
Save