新增文件自动下载功能,支持APK文件,非AppStore的IPA文件的下载以及对H5或者html页面进行缓存

新增后缀信息修正功能,误填任务类型可以根据后缀名进行自我修正
v1.0.7
kelvin_ben 4 years ago committed by Your Name
parent 6e98c99acf
commit 20b4f665ac
  1. 2
      config.py
  2. 28
      libs/core/download.py
  3. 4
      libs/task/android_task.py
  4. 28
      libs/task/base_task.py
  5. 36
      libs/task/download_task.py
  6. 36
      libs/task/net_task.py
  7. 5
      update.md

@ -84,7 +84,7 @@ web_file_suffix =[
]
# 配置自动下载Apk文件或者缓存HTML的请求头信息
header = {
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:81.0) Gecko/20100101 Firefox/81.0",
"Connection":"close"
}

@ -5,19 +5,23 @@ import re
import os
import time
import config
import threading
import requests
import threading
import libs.core as cores
from requests.packages import urllib3
from requests.adapters import HTTPAdapter
class DownloadThreads(threading.Thread):
def __init__(self, path, type):
self.url = path
self.type = type
def __init__(self,input_path,cache_path,types):
threading.Thread.__init__(self)
self.url = input_path
self.types = types
self.cache_path = cache_path
def __requset__(self):
try:
create_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
session = requests.Session()
session.mount('http://', HTTPAdapter(max_retries=3))
session.mount('https://', HTTPAdapter(max_retries=3))
@ -31,13 +35,12 @@ class DownloadThreads(threading.Thread):
resp = session.get(url=self.url,data=config.data ,headers=config.headers,timeout=30)
if resp.status_code == requests.codes.ok:
if self.type == "apk":
if self.types == "Android" or self.types == "iOS":
count = 0
count_tmp = 0
time1 = time.time()
length = float(resp.headers['content-length'])
apk_path = os.path.join(cores.download_path, create_time+".apk")
with open(apk_path, "wb") as f:
with open(self.cache_path, "wb") as f:
for chunk in resp.iter_content(chunk_size = 512):
if chunk:
f.write(chunk)
@ -50,15 +53,16 @@ class DownloadThreads(threading.Thread):
time1 = time.time()
f.close()
else:
html_path = os.path.join(cores.download_path, create_time+".html")
html = resp.html()
with open(html_path,"w",encoding='utf-8',errors='ignore') as f:
with open(self.cache_path,"w",encoding='utf-8',errors='ignore') as f:
f.write(html)
f.close()
else:
return
except Exception:
break
return
def run(self):
threadLock = threading.Lock()
self.__get_Http_info__(threadLock)
self.__requset__()

@ -69,7 +69,7 @@ class AndroidTask(object):
self.__shell_test__(output_path)
self.__scanner_file_by_apktool__(output_path)
else:
print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
print("[-] Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception(file_path + ", Decompilation failed.")
@ -79,7 +79,7 @@ class AndroidTask(object):
if os.system(cmd_str) == 0:
self.__get_scanner_file__(output_path)
else:
print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
print("[-] Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception(file_path + ", Decompilation failed.")

@ -7,11 +7,12 @@ import config
import threading
from queue import Queue
import libs.core as cores
from libs.core.parses import ParsesThreads
from libs.task.android_task import AndroidTask
from libs.task.ios_task import iOSTask
from libs.task.web_task import WebTask
from libs.task.net_task import NetTask
from libs.core.parses import ParsesThreads
from libs.task.android_task import AndroidTask
from libs.task.download_task import DownloadTask
class BaseTask(object):
thread_list =[]
@ -44,6 +45,9 @@ class BaseTask(object):
# 任务控制中心
task_info = self.__tast_control__()
if len(task_info) < 1:
return
file_queue = task_info["file_queue"]
shell_flag = task_info["shell_flag"]
comp_list = task_info["comp_list"]
@ -67,17 +71,25 @@ class BaseTask(object):
def __tast_control__(self):
task_info = {}
# 自动根据文件后缀名称进行修正
cache_info = DownloadTask().start(self.path,self.types)
cacar_path = cache_info["path"]
types = cache_info["type"]
if not os.path.exists(cacar_path):
print("[-] File download failed! Please download the file manually and try again.")
return task_info
# 调用Android 相关处理逻辑
if self.types == "Android":
task_info = AndroidTask(self.path,self.no_resource,self.package).start()
if types == "Android":
task_info = AndroidTask(cacar_path,self.no_resource,self.package).start()
# 调用iOS 相关处理逻辑
elif self.types == "iOS":
task_info = iOSTask(self.path,self.no_resource).start()
elif types == "iOS":
task_info = iOSTask(cacar_path,self.no_resource).start()
# 调用Web 相关处理逻辑
else:
task_info = WebTask(self.path).start()
task_info = WebTask(cacar_path).start()
return task_info
def __threads_control__(self,file_queue):

@ -3,23 +3,37 @@
# Github: https://github.com/kelvinBen/AppInfoScanner
import os
import re
import time
import config
import hashlib
from queue import Queue
import libs.core as cores
from libs.core.download import DownloadThreads
class DownloadTask(object):
def __init__(self):
self.path = path
def start(self,path,types):
create_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
if path.endswith("apk"):
types = "Android"
file_suffix = ".apk"
elif path.endswith("ipa"):
types = "iOS"
file_suffix = ".ipa"
else:
types = "WEB"
file_suffix = ".html"
def start(self):
input_path = self.path
if input_path.startswith("http://") or input_path.startswith("https://"):
if input_path.endswith("apk"):
# 用来下载APK或者缓存H5或者页面内容
pass
if not(path.startswith("http://") or path.startswith("https://")):
if not os.path.isdir(path):
return {"path":self.path,"type":types}
else:
return {"path":self.path,"type":self.types}
else:
print("[*] Detected that the task is not local, preparing to download file......")
cache_path = os.path.join(cores.download_path, create_time + file_suffix)
thread = DownloadThreads(path,cache_path,types)
thread.start()
thread.join()
return input_path
return {"path":cache_path,"type":types}

@ -102,39 +102,3 @@ class NetTask(object):
f.write(content+"\r")
f.close()
def __get_request_result__(url):
result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""}
cdn = ""
try:
rsp = requests.get(url, timeout=5,stream=True)
status_code = rsp.status_code
result["status"] = status_code
headers = rsp.headers
if "Server" in headers:
result["server"] = headers['Server']
if "Cookie" in headers:
result["cookie"] = headers['Cookie']
if "X-Via" in headers:
cdn = cdn + headers['X-Via']
if "Via" in headers:
cdn = cdn + headers['Via']
result["cdn"] = cdn
sock = rsp.raw._connection.sock
if sock:
des_ip = sock.getpeername()[0]
sou_ip = sock.getsockname()[0]
if des_ip:
result["des_ip"] = des_ip
if sou_ip:
result["sou_ip"] = sou_ip
html = rsp.text
title = re.findall('<title>(.+)</title>',html)
result["title"] = title
return result
except requests.exceptions.InvalidURL as e:
return "error"
except requests.exceptions.ConnectionError as e1:
return "timeout"
# print(__get_request_result__("http://download.sxzwfw.gov.cn/getMerchantSign"))

@ -1,3 +1,8 @@
### V1.0.7
- 新增文件自动下载功能,支持APK文件,非AppStore的IPA文件的下载以及对H5或者html页面进行缓存
- 新增后缀信息修正功能,误填任务类型可以根据后缀名进行自我修正
### V1.0.6
- 新增AI智能分析快速过滤第三方URL地址
- 新增Domain\Title\CDN识别功能

Loading…
Cancel
Save