1. 添加下载任务线程

2. 修改任务记录
v1.0.9
kelvinBen 3 years ago
parent 756025d7fd
commit 479abe2e3b
  1. 2
      libs/core/__init__.py
  2. 3
      libs/core/parses.py
  3. 12
      libs/task/android_task.py
  4. 67
      libs/task/base_task.py
  5. 78
      libs/task/download_task.py

@ -63,7 +63,7 @@ class Bootstrapper(object):
user_add_rules = rules user_add_rules = rules
threads_num = threads threads_num = threads
net_sniffer_flag = sniffer net_sniffer_flag = not sniffer
all_string_out = all all_string_out = all
no_resource_flag = no_resource no_resource_flag = no_resource

@ -39,9 +39,8 @@ class ParsesThreads(threading.Thread):
def __get_string_by_iOS__(self,file_path): def __get_string_by_iOS__(self,file_path):
output_path = cores.output_path output_path = cores.output_path
strings_path = cores.strings_path
temp = os.path.join(output_path,"temp.txt") temp = os.path.join(output_path,"temp.txt")
cmd_str = ('"%s" "%s" > "%s"') % (str(strings_path),str(file_path),str(temp)) cmd_str = ('"%s" "%s" > "%s"') % (str(cores.strings_file),str(file_path),str(temp))
if os.system(cmd_str) == 0: if os.system(cmd_str) == 0:
with open(temp,"r",encoding='utf-8',errors='ignore') as f: with open(temp,"r",encoding='utf-8',errors='ignore') as f:
lines = f.readlines() lines = f.readlines()

@ -13,14 +13,14 @@ import libs.core as cores
class AndroidTask(object): class AndroidTask(object):
def __init__(self,path,package): def __init__(self, file_path, package):
self.path = path self.input_file_path = file_path
self.package = package self.package = package
self.file_queue = Queue() self.file_queue = Queue()
self.shell_flag=False self.shell_flag = False
self.packagename="" self.packagename = ""
self.comp_list=[] self.comp_list = []
self.file_identifier=[] self.file_identifier = []
def start(self): def start(self):
# 检查java环境是否存在 # 检查java环境是否存在

@ -60,7 +60,8 @@ class BaseTask(object):
for download_file in self.download_file_list: for download_file in self.download_file_list:
file_path = download_file["path"] file_path = download_file["path"]
types = download_file["type"] types = download_file["type"]
self.__control_center__(file_path,types) # 控制中心
self.__control_center__(file_path, types)
# 统一文件下载中心 # 统一文件下载中心
def __download_file_center__(self,types): def __download_file_center__(self,types):
@ -77,7 +78,7 @@ class BaseTask(object):
thread.join() thread.join()
# 控制中心 # 控制中心
def __control_center__(self,file_path,types): def __control_center__(self, file_path, types):
logging.info("[*] Processing {}".format(file_path)) logging.info("[*] Processing {}".format(file_path))
logging.info("[*] AI is analyzing filtering rules......") logging.info("[*] AI is analyzing filtering rules......")
@ -85,60 +86,56 @@ class BaseTask(object):
self.__history_handle__() self.__history_handle__()
logging.info("[*] The filtering rules obtained by AI are as follows: {}".format(set(config.filter_no))) logging.info("[*] The filtering rules obtained by AI are as follows: {}".format(set(config.filter_no)))
cacar_path = cache_info["path"]
types = cache_info["type"]
# 任务控制中心 # 任务控制中心
task_info = self.__tast_control__(file_path) task_info = self.__tast_control__(file_path, types)
if len(task_info) < 1: if len(task_info) < 1:
return return
# 文件队列 # 文件队列
file_queue = task_info["file_queue"] file_queue = task_info["file_queue"]
# 是否存在壳 # 是否存在壳
shell_flag = task_info["shell_flag"] shell_flag = task_info["shell_flag"]
# 组件列表(仅适用于Android) # 组件列表(仅适用于Android)
comp_list = task_info["comp_list"] comp_list = task_info["comp_list"]
# 报名信息(仅适用于Android) # 报名信息(仅适用于Android)
packagename = task_info["packagename"] packagename = task_info["packagename"]
# 文件标识符 # 文件标识符
file_identifier = task_info["file_identifier"] file_identifier = task_info["file_identifier"]
if shell_flag: if shell_flag:
logging.error('[x] This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!') logging.error('[x] This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!')
continue return
# 线程控制中心 # 线程控制中心
logging.info("[*] ========= Searching for strings that match the rules ===============") logging.info("[*] ========= Searching for strings that match the rules ===============")
self.__threads_control__(file_queue) self.__threads_control__(file_queue)
# 等待线程结束 # 等待线程结束
for thread in self.thread_list: for thread in self.thread_list:
thread.join() thread.join()
# 结果输出中心 # 结果输出中心
self.__print_control__(packagename,comp_list,file_identifier) self.__print_control__(packagename,comp_list,file_identifier)
# 任务控制中心 # 任务控制中心
def __tast_control__(self,user_input_path): def __tast_control__(self, file_path, types):
task_info = {} task_info = {}
if (not os.path.exists(cacar_path) and cores.download_flag): # 通过网络下载的文件如果不存在就直接返回任务控制中心
logging.error("[x] File download failed! Please download the file manually and try again.") if (not os.path.exists(file_path) and cores.download_flag):
logging.error("[x] {} download failed! Please download the file manually and try again.".format(file_path))
return task_info return task_info
# 调用Android 相关处理逻辑 # 调用Android 相关处理逻辑
if types == "Android": if types == "Android":
task_info = AndroidTask(cacar_path,self.package).start() task_info = AndroidTask(file_path, self.package).start()
# 调用iOS 相关处理逻辑 # 调用iOS 相关处理逻辑
elif types == "iOS": elif types == "iOS":
task_info = iOSTask(cacar_path).start() task_info = iOSTask(file_path).start()
# 调用Web 相关处理逻辑 # 调用Web 相关处理逻辑
else: else:
task_info = WebTask(cacar_path).start() task_info = WebTask(file_path).start()
return task_info return task_info
# 线程控制中心 # 线程控制中心

@ -0,0 +1,78 @@
#! /usr/bin/python3
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import os
import re
import time
import config
import hashlib
import logging
from queue import Queue
import libs.core as cores
from libs.core.download import DownloadThreads
class DownloadTask(object):
def __init__(self):
self.download_file_queue = Queue()
self.thread_list = []
def start(self, path, types):
self.__local_or_remote__(path, types)
for threadID in range(1, cores.threads_num):
name = "Thread - " + str(int(threadID))
thread = DownloadThreads(threadID,name,self.download_file_queue)
thread.start()
thread.join()
# 判断文件是本地加载还是远程加载
def __local_or_remote__(self,path,types):
# 添加文件后缀判断
self.__update_type__(path)
# 处理本地文件
if not(path.startswith("http://") or path.startswith("https://")):
if not os.path.isdir(path): # 不是目录
return {"path":path,"type":types}
else: # 目录处理
return {"path":path,"type":types}
else:
self.__net_header__(path,types)
# self.download_file_queue.put(path)
# 处理网络请求
def __net_header__(self, path, types):
create_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
if path.endswith("apk") or types == "Android":
types = "Android"
file_name = create_time+ ".apk"
elif path.endswith("ipa") or types == "iOS":
types = "iOS"
file_name = create_time + ".ipa"
else:
types = "WEB"
file_name = create_time + ".html"
logging.info("[*] Detected that the task is not local, preparing to download file......")
cache_path = os.path.join(cores.download_dir, file_name)
self.download_file_queue.put({"path":path, "cache_path":cache_path, "types":types})
# thread = DownloadThreads(path,file_name,cache_path,types)
# thread.start()
# thread.join()
return {"path":cache_path,"type":types}
def __update_type__(self, path, types, file_name=None):
create_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
if path.endswith("apk") or types == "Android":
types = "Android"
if not file_name:
file_name = create_time+ ".apk"
elif path.endswith("ipa") or types == "iOS":
types = "iOS"
if not file_name:
file_name = create_time + ".ipa"
else:
types = "WEB"
if not file_name:
file_name = create_time + ".html"
return types,file_name
Loading…
Cancel
Save