- 新增AI智能分析快速过滤第三方URL地址

- 新增Domain\Title\CDN识别功能
- 优化Excel文件内容输出格式
- 优化URL和IP筛选规则
- 优化全局日志输出
- 修复APK下多DEX文件搜索结果不准确的问题
v1.0.6
kelvin_ben 4 years ago committed by Your Name
parent cd7bb3a364
commit bbbb7947be
  1. 14
      app.py
  2. 39
      config.py
  3. 12
      libs/core/net.py
  4. 19
      libs/core/parses.py
  5. 46
      libs/task/base_task.py
  6. 45
      libs/task/net_task.py
  7. 3
      libs/task/web_task.py
  8. 2
      update.md

@ -17,7 +17,7 @@ def cli():
@cli.command(help="Get the key information of Android system.") @cli.command(help="Get the key information of Android system.")
@click.option("-i", "--inputs", required=True, type=str, help="Input APK file or DEX directory.") @click.option("-i", "--inputs", required=True, type=str, help="Input APK file or DEX directory.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.")
@click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.") @click.option("-s", "--net-sniffer", is_flag=True, default=True, help="Whether to enable network sniffing.")
@click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.") @click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.")
@click.option("-p", '--package',required=False,type=str,default="",help="Specifies the retrieval package name.") @click.option("-p", '--package',required=False,type=str,default="",help="Specifies the retrieval package name.")
@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.")
@ -36,7 +36,7 @@ def android(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,package:
@cli.command(help="Get the key information of iOS system.") @cli.command(help="Get the key information of iOS system.")
@click.option("-i", "--inputs", required=True, type=str, help="Input IPA file or ELF file.") @click.option("-i", "--inputs", required=True, type=str, help="Input IPA file or ELF file.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.")
@click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.") @click.option("-s", "--net-sniffer", is_flag=True, default=True, help="Whether to enable network sniffing.")
@click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.") @click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.")
@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.")
@click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default")
@ -57,17 +57,15 @@ def ios(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,all_str:bool
@click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.")
@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.")
@click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default")
def web(inputs: str, rules: str, all_str:bool,threads:int) -> None: @click.option("-s", "--net-sniffer", is_flag=True, default=True, help="Whether to enable network sniffing.")
def web(inputs: str, rules: str, all_str:bool,threads:int,net_sniffer) -> None:
try: try:
# 初始化全局对象 # 初始化全局对象
bootstrapper = Bootstrapper(__file__) bootstrapper = Bootstrapper(__file__)
bootstrapper.init() bootstrapper.init()
BaseTask("Web", inputs, rules,all_str, threads).start() BaseTask("Web", inputs, rules,all_str, net_sniffer,threads).start()
# task = WebTask(input, rules,all,threads)
# task.start()
except Exception as e: except Exception as e:
raise e raise e

@ -29,35 +29,26 @@ filter_components = [
# 3. URI地址,URI不能很好的拼接所以此处忽略 # 3. URI地址,URI不能很好的拼接所以此处忽略
filter_strs =[ filter_strs =[
r'https://.*|http://.*', r'https://.*|http://.*',
r'.*://([[0-9]{1,3}\.]{3}[0-9]{1,3}).*', # r'.*://([[0-9]{1,3}\.]{3}[0-9]{1,3}).*',
r'.*://([\d{1,3}\.]{3}\d{1,3}).*'
# r'/[a-z0-9A-Z]+/.*' # r'/[a-z0-9A-Z]+/.*'
] ]
# 此处忽略常见的域名等信息 # 此处忽略常见的域名等信息
filter_no = [ filter_no = [
# r'.*127.0.0.1', r'.*127.0.0.1',
# r'.*0.0.0.0', r'.*0.0.0.0',
# r'.*localhost', r'.*localhost',
# r'.*w3.org', r'.*w3.org',
# r'.*apache.org', r'.*apache.org',
# r'.*android.com', r'.*android.com',
# r'.*weixin.qq.com', r'.*jpush.cn',
# r'.*jpush.cn', r'.*umengcloud.com',
# r'.*umengcloud.com', r'.*umeng.com',
# r'.*umeng.com', r'.*github.com',
# r'.*baidu.com', r'.*w3school.com.cn',
# r'.*apple.com', r'.*apple.com',
# r'.*alibaba.com', r'.*.amap.com',
# r'.*qq.com',
# r'.*sohu.com',
# r'.*openssl.org',
# r'.*weibo.com',
# r'.*wechat.com',
# r'.*.amap.com',
# r'.*openxmlformats.org',
# r'.*github.com',
# r'.*w3school.com.cn',
# r'.*google.com'
] ]
# 此处配置壳信息 # 此处配置壳信息

@ -16,11 +16,14 @@ class NetThreads(threading.Thread):
def __get_Http_info__(self,threadLock): def __get_Http_info__(self,threadLock):
while True: while True:
if self.domain_queue.empty():
break
domains = self.domain_queue.get(timeout=5) domains = self.domain_queue.get(timeout=5)
domain = domains["domain"] domain = domains["domain"]
url_ip = domains["url_ip"] url_ip = domains["url_ip"]
time.sleep(2) time.sleep(2)
result = self.__get_request_result__(url_ip) result = self.__get_request_result__(url_ip)
print("[+] Processing URL address:"+url_ip)
if result != "error": if result != "error":
if self.lock.acquire(True): if self.lock.acquire(True):
cores.excel_row = cores.excel_row + 1 cores.excel_row = cores.excel_row + 1
@ -33,17 +36,15 @@ class NetThreads(threading.Thread):
self.worksheet.write(cores.excel_row, 5, label = result["server"]) self.worksheet.write(cores.excel_row, 5, label = result["server"])
self.worksheet.write(cores.excel_row, 6, label = result["title"]) self.worksheet.write(cores.excel_row, 6, label = result["title"])
self.worksheet.write(cores.excel_row, 7, label = result["cdn"]) self.worksheet.write(cores.excel_row, 7, label = result["cdn"])
self.worksheet.write(cores.excel_row, 8, label = "") # self.worksheet.write(cores.excel_row, 8, label = "")
self.lock.release() self.lock.release()
if self.domain_queue.empty():
break
def __get_request_result__(self,url): def __get_request_result__(self,url):
result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""} result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""}
cdn = "" cdn = ""
try: try:
# python3 app.py ios -i C:\Users\Administrator\Desktop\frida-ipa-dump\三晋通\Payload\三晋通.app\三晋通
#
with requests.get(url, timeout=5,stream=True) as rsp: with requests.get(url, timeout=5,stream=True) as rsp:
status_code = rsp.status_code status_code = rsp.status_code
result["status"] = status_code result["status"] = status_code
@ -80,7 +81,6 @@ class NetThreads(threading.Thread):
except requests.exceptions.ReadTimeout as e2: except requests.exceptions.ReadTimeout as e2:
return "timeout" return "timeout"
def run(self): def run(self):
threadLock = threading.Lock() threadLock = threading.Lock()
self.__get_Http_info__(threadLock) self.__get_Http_info__(threadLock)

@ -26,7 +26,7 @@ class ParsesThreads(threading.Thread):
break break
file_path = self.file_queue.get(timeout = 5) file_path = self.file_queue.get(timeout = 5)
scan_str = ("Scan file : %s" % file_path) scan_str = ("[+] Scan file : %s" % file_path)
print(scan_str) print(scan_str)
if self.types == "iOS": if self.types == "iOS":
@ -65,7 +65,7 @@ class ParsesThreads(threading.Thread):
for filter_str in config.filter_strs: for filter_str in config.filter_strs:
filter_str_pat = re.compile(filter_str) filter_str_pat = re.compile(filter_str)
filter_resl = filter_str_pat.findall(result) filter_resl = filter_str_pat.findall(result)
# print(result,filter_resl)
# 过滤掉未搜索到的内容 # 过滤掉未搜索到的内容
if len(filter_resl)!=0: if len(filter_resl)!=0:
# 提取第一个结果 # 提取第一个结果
@ -75,27 +75,18 @@ class ParsesThreads(threading.Thread):
continue continue
self.threadLock.acquire() self.threadLock.acquire()
print("[+] The string searched for matching rule is: %s" % (resl_str))
self.result_list.append(resl_str) self.result_list.append(resl_str)
self.threadLock.release() self.threadLock.release()
continue continue
def __filter__(self,resl_str): def __filter__(self,resl_str):
return_flag = 1 return_flag = 1
print(resl_str)
resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","") resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","")
if len(resl_str) == 0: if len(resl_str) == 0:
return 0 return 0
# 目前流通的域名中加上协议头最短长度为11位
if len(resl_str) <= 10:
return 0
# 单独处理https或者http开头的字符串
# http_list =["https","https://","https:","http","http://","https:",]
# for filte in http_list:
# if filte == resl_str:
# return 0
for filte in config.filter_no: for filte in config.filter_no:
resl_str = resl_str.replace(filte,"") resl_str = resl_str.replace(filte,"")
if len(resl_str) == 0: if len(resl_str) == 0:

@ -19,7 +19,7 @@ class BaseTask(object):
app_history_list=[] app_history_list=[]
# 统一初始化入口 # 统一初始化入口
def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10): def __init__(self, types="Android", inputs="", rules="", net_sniffer=True, no_resource=False, package="", all_str=False, threads=10):
self.types = types self.types = types
self.net_sniffer = net_sniffer self.net_sniffer = net_sniffer
self.path = inputs self.path = inputs
@ -35,12 +35,12 @@ class BaseTask(object):
# 统一调度平台 # 统一调度平台
def start(self): def start(self):
print("[*] AI决策系统正在分析规则中...") print("[*] AI is analyzing filtering rules......")
# 获取历史记录 # 获取历史记录
self.__history_handle__() self.__history_handle__()
print("[*] 本次的过滤规则为:" , config.filter_no) print("[*] The filtering rules obtained by AI are as follows: %s" % (config.filter_no) )
# 任务控制中心 # 任务控制中心
task_info = self.__tast_control__() task_info = self.__tast_control__()
@ -55,6 +55,7 @@ class BaseTask(object):
return return
# 线程控制中心 # 线程控制中心
print("[*] ========= Searching for strings that match the rules ===============")
self.__threads_control__(file_queue) self.__threads_control__(file_queue)
# 等待线程结束 # 等待线程结束
@ -92,44 +93,19 @@ class BaseTask(object):
# 此处需要hash值或者应用名称, apk文件获取pachage, dex文件获取hash, macho-o获取文件名 # 此处需要hash值或者应用名称, apk文件获取pachage, dex文件获取hash, macho-o获取文件名
if packagename: if packagename:
print("========= The package name of this APP is: ===============") print("[*] ========= The package name of this APP is: ===============")
print(packagename) print(packagename)
if len(comp_list) != 0: if len(comp_list) != 0:
print("========= Component information is as follows :===============") print("[*] ========= Component information is as follows :===============")
for json in comp_list: for json in comp_list:
print(json) print(json)
print("=========The result set for the static scan is shown below:===============")
NetTask(self.result_dict,self.app_history_list,file_identifier,self.threads).start()
# with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f: if self.net_sniffer:
# row = 1 print("[*] ========= Sniffing the URL address of the search ===============")
# for key,value in self.result_dict.items(): NetTask(self.result_dict,self.app_history_list,file_identifier,self.threads).start()
# f.write(key+"\r") print("[*] For more information about the search, see XLS file result: %s" %(cores.xls_result_path))
print("[*] For more information about the search, see TXT file result: %s" %(cores.txt_result_path))
# for result in value:
# if result in self.value_list:
# continue
# if not(file_identifier in self.app_history_list) and ("http://" in result or "https://" in result):
# domain = result.replace("https://","").replace("http://","")
# if "/" in domain:
# domain = domain[:domain.index("/")]
# if not(domain in self.domain_list):
# self.domain_list.append(domain)
# self.__write_content_in_file__(cores.domain_history_path,domain)
# if append_file_flag:
# for identifier in file_identifier:
# self.__write_content_in_file__(cores.app_history_path,identifier)
# append_file_flag = False
# self.value_list.append(result)
# worksheet.write(row,0, label = result)
# row = row + 1
# f.write("\t"+result+"\r")
print("For more information about the search, see TXT file result: %s" %(cores.txt_result_path))
print("For more information about the search, see XLS file result: %s" %(cores.xls_result_path))
def __history_handle__(self): def __history_handle__(self):
domain_history_path = cores.domain_history_path domain_history_path = cores.domain_history_path

@ -26,13 +26,14 @@ class NetTask(object):
xls_result_path = cores.xls_result_path xls_result_path = cores.xls_result_path
workbook = xlwt.Workbook(encoding = 'utf-8') workbook = xlwt.Workbook(encoding = 'utf-8')
worksheet = self.__creating_excel_header__(workbook) worksheet = self.__creating_excel_header__(workbook)
self.__start_threads__(worksheet)
self.__write_result_to_txt__() self.__write_result_to_txt__()
self.__start_threads__(worksheet)
for thread in self.thread_list: for thread in self.thread_list:
thread.join() thread.join()
workbook.save(xls_result_path) workbook.save(xls_result_path)
def __creating_excel_header__(self,workbook): def __creating_excel_header__(self,workbook):
@ -45,7 +46,7 @@ class NetTask(object):
worksheet.write(0,5, label = "Server") worksheet.write(0,5, label = "Server")
worksheet.write(0,6, label = "Title") worksheet.write(0,6, label = "Title")
worksheet.write(0,7, label = "CDN") worksheet.write(0,7, label = "CDN")
worksheet.write(0,8, label = "Finger") # worksheet.write(0,8, label = "Finger")
return worksheet return worksheet
def __write_result_to_txt__(self): def __write_result_to_txt__(self):
@ -58,29 +59,33 @@ class NetTask(object):
for result in value: for result in value:
if result in self.value_list: if result in self.value_list:
continue continue
# 100个文件标识
for file in self.file_identifier:
if not(file in self.app_history_list) and ("http://" in result or "https://" in result):
# print(self.file_identifier,self.app_history_list,not(self.file_identifier[0] in self.app_history_list)) if (("http://" in result) or ("https://" in result)) and ("." in result):
# if not(self.file_identifier in self.app_history_list) and ("http://" in result or "https://" in result): domain = result.replace("https://","").replace("http://","")
domain = result.replace("https://","").replace("http://","") if "/" in domain:
if "/" in domain: domain = domain[:domain.index("/")]
domain = domain[:domain.index("/")]
if "|" in result:
self.domain_queue.put({"domain":domain,"url_ip":result}) result = result[:result.index("|")]
# 目前流通的域名中加上协议头最短长度为11位
if len(result) <= 10:
continue
self.domain_queue.put({"domain":domain,"url_ip":result})
for identifier in self.file_identifier:
if identifier in self.app_history_list:
if not(domain in self.domain_list):
self.domain_list.append(domain)
self.__write_content_in_file__(cores.domain_history_path,domain)
continue
print(domain,self.domain_list,not(domain in self.domain_list))
if not(domain in self.domain_list): if not(domain in self.domain_list):
self.domain_list.append(domain) self.domain_list.append(domain)
self.__write_content_in_file__(cores.domain_history_path,domain) self.__write_content_in_file__(cores.domain_history_path,domain)
if append_file_flag: if append_file_flag:
for identifier in self.file_identifier: self.__write_content_in_file__(cores.app_history_path,identifier)
if self.file_identifier in self.app_history_list: append_file_flag = False
continue
self.__write_content_in_file__(cores.app_history_path,identifier)
append_file_flag = False
self.value_list.append(result) self.value_list.append(result)
f.write("\t"+result+"\r") f.write("\t"+result+"\r")
f.close() f.close()

@ -37,4 +37,7 @@ class WebTask(object):
else: else:
if len(dir_file.split("."))>1: if len(dir_file.split("."))>1:
if dir_file.split(".")[-1] in file_suffix: if dir_file.split(".")[-1] in file_suffix:
with open(file_path,'rb') as f:
dex_md5 = str(hashlib.md5().update(f.read()).hexdigest()).upper()
self.file_identifier.append(dex_md5)
self.file_queue.put(dir_file_path) self.file_queue.put(dir_file_path)

@ -1,6 +1,6 @@
### V1.0.6 ### V1.0.6
- 新增AI智能分析快速过滤第三方URL地址 - 新增AI智能分析快速过滤第三方URL地址
- 新增Domain\Title\CDN\Finger识别功能 - 新增Domain\Title\CDN识别功能
- 优化Excel文件内容输出格式 - 优化Excel文件内容输出格式
- 优化URL和IP筛选规则 - 优化URL和IP筛选规则
- 优化全局日志输出 - 优化全局日志输出

Loading…
Cancel
Save