diff --git a/config.py b/config.py index f298edb..1fc5bfe 100644 --- a/config.py +++ b/config.py @@ -26,26 +26,38 @@ filter_components = [ # 此处目前支持过滤 # 1. https://以及http://开头的 # 2. IPv4的ip地址 -# 3. URI地址 +# 3. URI地址,URI不能很好的拼接所以此处忽略 filter_strs =[ - r'^http://.*', - r'^https://.*', - r'.*(http://.*)', - r'.*(https://.*)', - r'.*((?:[0-9]{1,3}\.){3}[0-9]{1,3}).*', + r'https://.*|http://.*', + r'.*://([[0-9]{1,3}\.]{3}[0-9]{1,3}).*', # r'/[a-z0-9A-Z]+/.*' ] -# 过滤无用的内容, +# 此处忽略常见的域名等信息 filter_no = [ - u'127.0.0.1', - u'0.0.0.0', - u'localhost', - r"^http://www.w3.org" - r"L.*/", - r"/.*;", - r"/.*<", - r'^http://schemas.android.com', + # r'.*127.0.0.1', + # r'.*0.0.0.0', + # r'.*localhost', + # r'.*w3.org', + # r'.*apache.org', + # r'.*android.com', + # r'.*weixin.qq.com', + # r'.*jpush.cn', + # r'.*umengcloud.com', + # r'.*umeng.com', + # r'.*baidu.com', + # r'.*apple.com', + # r'.*alibaba.com', + # r'.*qq.com', + # r'.*sohu.com', + # r'.*openssl.org', + # r'.*weibo.com', + # r'.*wechat.com', + # r'.*.amap.com', + # r'.*openxmlformats.org', + # r'.*github.com', + # r'.*w3school.com.cn', + # r'.*google.com' ] # 此处配置壳信息 @@ -79,3 +91,4 @@ web_file_suffix =[ "aspx", "py" ] + diff --git a/libs/core/__init__.py b/libs/core/__init__.py index fe119ec..c83b44a 100644 --- a/libs/core/__init__.py +++ b/libs/core/__init__.py @@ -34,6 +34,10 @@ class Bootstrapper(object): global txt_result_path global xls_result_path global strings_path + global history_path + global app_history_path + global domain_history_path + global excel_row create_time = time.strftime("%Y%m%d%H%M%S", time.localtime()) @@ -50,18 +54,26 @@ class Bootstrapper(object): strings_path = os.path.join(tools_dir,"strings64.exe") else: strings_path ="strings" - + + excel_row = 0 backsmali_path = os.path.join(tools_dir,"baksmali.jar") apktool_path = os.path.join(tools_dir, "apktool.jar") output_path = os.path.join(script_root_dir,"out") + history_path = os.path.join(script_root_dir,"history") txt_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".txt") xls_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".xls") + app_history_path = os.path.join(history_path,"app_history.txt") + domain_history_path = os.path.join(history_path,"domain_history.txt") + # 包名信息一致的情况不记录URL信息 ,不一致的时候记录URL信息 def init(self): if os.path.exists(output_path): shutil.rmtree(output_path) os.makedirs(output_path) + if not os.path.exists(history_path): + os.makedirs(history_path) + if os.path.exists(txt_result_path): os.remove(txt_result_path) diff --git a/libs/core/net.py b/libs/core/net.py new file mode 100644 index 0000000..4e903b5 --- /dev/null +++ b/libs/core/net.py @@ -0,0 +1,86 @@ +import re +import time +import threading +import requests +import libs.core as cores + +class NetThreads(threading.Thread): + + def __init__(self,threadID,name,domain_queue,worksheet): + threading.Thread.__init__(self) + self.name = name + self.threadID = threadID + self.lock = threading.Lock() + self.domain_queue = domain_queue + self.worksheet = worksheet + + def __get_Http_info__(self,threadLock): + while True: + domains = self.domain_queue.get(timeout=5) + domain = domains["domain"] + url_ip = domains["url_ip"] + time.sleep(2) + result = self.__get_request_result__(url_ip) + if result != "error": + if self.lock.acquire(True): + cores.excel_row = cores.excel_row + 1 + self.worksheet.write(cores.excel_row, 0, label = cores.excel_row) + self.worksheet.write(cores.excel_row, 1, label = url_ip) + self.worksheet.write(cores.excel_row, 2, label = domain) + if result != "timeout": + self.worksheet.write(cores.excel_row, 3, label = result["status"]) + self.worksheet.write(cores.excel_row, 4, label = result["des_ip"]) + self.worksheet.write(cores.excel_row, 5, label = result["server"]) + self.worksheet.write(cores.excel_row, 6, label = result["title"]) + self.worksheet.write(cores.excel_row, 7, label = result["cdn"]) + self.worksheet.write(cores.excel_row, 8, label = "") + self.lock.release() + if self.domain_queue.empty(): + break + + def __get_request_result__(self,url): + result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""} + cdn = "" + try: + # python3 app.py ios -i C:\Users\Administrator\Desktop\frida-ipa-dump\三晋通\Payload\三晋通.app\三晋通 + # + with requests.get(url, timeout=5,stream=True) as rsp: + status_code = rsp.status_code + result["status"] = status_code + headers = rsp.headers + if "Server" in headers: + result["server"] = headers['Server'] + if "Cookie" in headers: + result["cookie"] = headers['Cookie'] + if "X-Via" in headers: + cdn = cdn + headers['X-Via'] + if "Via" in headers: + cdn = cdn + headers['Via'] + result["cdn"] = cdn + sock = rsp.raw._connection.sock + + if sock: + des_ip = sock.getpeername()[0] + sou_ip = sock.getsockname()[0] + if des_ip: + result["des_ip"] = des_ip + if sou_ip: + result["sou_ip"] = sou_ip + sock.close() + html = rsp.text + title = re.findall('(.+)',html) + if title: + result["title"] = title[0] + rsp.close() + return result + except requests.exceptions.InvalidURL as e: + return "error" + except requests.exceptions.ConnectionError as e1: + return "timeout" + except requests.exceptions.ReadTimeout as e2: + return "timeout" + + + def run(self): + threadLock = threading.Lock() + self.__get_Http_info__(threadLock) diff --git a/libs/core/parses.py b/libs/core/parses.py index d2c0326..621d044 100644 --- a/libs/core/parses.py +++ b/libs/core/parses.py @@ -81,15 +81,20 @@ class ParsesThreads(threading.Thread): def __filter__(self,resl_str): return_flag = 1 + print(resl_str) resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","") if len(resl_str) == 0: return 0 + # 目前流通的域名中加上协议头最短长度为11位 + if len(resl_str) <= 10: + return 0 + # 单独处理https或者http开头的字符串 - http_list =["https","https://","https:","http","http://","https:",] - for filte in http_list: - if filte == resl_str: - return 0 + # http_list =["https","https://","https:","http","http://","https:",] + # for filte in http_list: + # if filte == resl_str: + # return 0 for filte in config.filter_no: resl_str = resl_str.replace(filte,"") diff --git a/libs/task/android_task.py b/libs/task/android_task.py index 5bb536e..a616f66 100644 --- a/libs/task/android_task.py +++ b/libs/task/android_task.py @@ -1,15 +1,13 @@ # -*- coding: utf-8 -*- # Author: kelvinBen # Github: https://github.com/kelvinBen/AppInfoScanner - - import os import re import config -import threading +import hashlib from queue import Queue import libs.core as cores -from libs.core.parses import ParsesThreads + class AndroidTask(object): @@ -22,6 +20,7 @@ class AndroidTask(object): self.shell_flag=False self.packagename="" self.comp_list=[] + self.file_identifier=[] def start(self): # 检查java环境是否存在 @@ -36,7 +35,7 @@ class AndroidTask(object): if self.__decode_file__(input_file_path) == "error": raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.") - return {"comp_list":self.comp_list,"shell_flag":self.shell_flag,"file_queue":self.file_queue,"packagename":self.packagename} + return {"comp_list":self.comp_list,"shell_flag":self.shell_flag,"file_queue":self.file_queue,"packagename":self.packagename,"file_identifier":self.file_identifier} def __decode_file__(self,file_path): apktool_path = str(cores.apktool_path) @@ -46,6 +45,9 @@ class AndroidTask(object): if suffix_name == "apk": self.__decode_apk__(file_path,apktool_path,output_path) elif suffix_name == "dex": + with open(file_path,'rb') as f: + dex_md5 = str(hashlib.md5().update(f.read()).hexdigest()).upper() + self.file_identifier.append(dex_md5) self.__decode_dex__(file_path,backsmali_path,output_path) else: return "error" @@ -83,17 +85,17 @@ class AndroidTask(object): # 初始化检测文件信息 def __scanner_file_by_apktool__(self,output_path): - if self.no_resource: - scanner_dir_lists = ["smali"] - scanner_file_suffixs = ["smali"] - else: - scanner_dir_lists = ["smali","assets"] - scanner_file_suffixs = ["smali","js","xml"] + file_names = os.listdir(output_path) + for file_name in file_names: + file_path = os.path.join(output_path,file_name) + if not os.path.isdir(file_path): + continue - for scanner_dir_list in scanner_dir_lists: - scanner_dir = os.path.join(output_path,scanner_dir_list) - if os.path.exists(scanner_dir): - self.__get_scanner_file__(scanner_dir,scanner_file_suffixs) + if "smali" in file_name or "assets" in file_name: + scanner_file_suffixs = ["smali","js","xml"] + if self.no_resource: + scanner_file_suffixs =["smali"] + self.__get_scanner_file__(file_path,scanner_file_suffixs) def __get_scanner_file__(self,scanner_dir,scanner_file_suffixs=["smali"]): dir_or_files = os.listdir(scanner_dir) @@ -115,13 +117,14 @@ class AndroidTask(object): def __shell_test__(self,output): am_path = os.path.join(output,"AndroidManifest.xml") - with open(am_path,"r") as f: + with open(am_path,"r",encoding='utf-8',errors='ignore') as f: am_str = f.read() am_package= re.compile(r'=1: self.packagename = apackage[0] + self.file_identifier.append(apackage[0]) am_name = re.compile(r'') aname = am_name.findall(am_str) diff --git a/libs/task/base_task.py b/libs/task/base_task.py index 1ef581e..017417e 100644 --- a/libs/task/base_task.py +++ b/libs/task/base_task.py @@ -1,11 +1,8 @@ # -*- coding: utf-8 -*- # Author: kelvinBen # Github: https://github.com/kelvinBen/AppInfoScanner - -# 接收传入的参数信息,根据参数进行平台分发 import os import re -import xlwt import config import threading from queue import Queue @@ -14,12 +11,13 @@ from libs.core.parses import ParsesThreads from libs.task.android_task import AndroidTask from libs.task.ios_task import iOSTask from libs.task.web_task import WebTask +from libs.task.net_task import NetTask class BaseTask(object): thread_list =[] result_dict = {} - value_list = [] - + app_history_list=[] + # 统一初始化入口 def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10): self.types = types @@ -32,20 +30,25 @@ class BaseTask(object): self.all = all_str self.threads = threads self.file_queue = Queue() + # 统一调度平台 def start(self): - workbook = xlwt.Workbook(encoding = 'utf-8') + + print("[*] AI决策系统正在分析规则中...") + + # 获取历史记录 + self.__history_handle__() + + print("[*] 本次的过滤规则为:" , config.filter_no) - # 创建excel头 - worksheet = self.__creating_excel_header__(workbook) - # 任务控制中心 task_info = self.__tast_control__() file_queue = task_info["file_queue"] shell_flag = task_info["shell_flag"] comp_list = task_info["comp_list"] packagename = task_info["packagename"] + file_identifier = task_info["file_identifier"] if shell_flag: print('\033[3;31m Error: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!') @@ -57,14 +60,10 @@ class BaseTask(object): # 等待线程结束 for thread in self.thread_list: thread.join() - + # 结果输出中心 - self.__print_control__(packagename,comp_list,workbook,worksheet) + self.__print_control__(packagename,comp_list,file_identifier) - def __creating_excel_header__(self,workbook): - worksheet = workbook.add_sheet("扫描信息",cell_overwrite_ok=True) - worksheet.write(0,0, label = "扫描结果") - return worksheet def __tast_control__(self): task_info = {} @@ -86,10 +85,12 @@ class BaseTask(object): thread.start() self.thread_list.append(thread) - def __print_control__(self,packagename,comp_list,workbook,worksheet): + def __print_control__(self,packagename,comp_list,file_identifier): txt_result_path = cores.txt_result_path xls_result_path = cores.xls_result_path + # 此处需要hash值或者应用名称, apk文件获取pachage, dex文件获取hash, macho-o获取文件名 + if packagename: print("========= The package name of this APP is: ===============") print(packagename) @@ -98,22 +99,63 @@ class BaseTask(object): print("========= Component information is as follows :===============") for json in comp_list: print(json) - print("=========The result set for the static scan is shown below:===============") - with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f: - row = 1 - for key,value in self.result_dict.items(): - f.write(key+"\r") - for result in value: - if result in self.value_list: - continue - self.value_list.append(result) - print(result) - worksheet.write(row,0, label = result) - row = row + 1 - f.write("\t"+result+"\r") - print("For more information about the search, see TXT file result: %s" %(txt_result_path)) - print("For more information about the search, see XLS file result: %s" %(xls_result_path)) - workbook.save(xls_result_path) + + NetTask(self.result_dict,self.app_history_list,file_identifier,self.threads).start() + # with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f: + # row = 1 + # for key,value in self.result_dict.items(): + # f.write(key+"\r") + # for result in value: + # if result in self.value_list: + # continue + # if not(file_identifier in self.app_history_list) and ("http://" in result or "https://" in result): + # domain = result.replace("https://","").replace("http://","") + # if "/" in domain: + # domain = domain[:domain.index("/")] + + # if not(domain in self.domain_list): + # self.domain_list.append(domain) + # self.__write_content_in_file__(cores.domain_history_path,domain) + # if append_file_flag: + # for identifier in file_identifier: + # self.__write_content_in_file__(cores.app_history_path,identifier) + # append_file_flag = False + + # self.value_list.append(result) + # worksheet.write(row,0, label = result) + # row = row + 1 + # f.write("\t"+result+"\r") + print("For more information about the search, see TXT file result: %s" %(cores.txt_result_path)) + print("For more information about the search, see XLS file result: %s" %(cores.xls_result_path)) + + def __history_handle__(self): + domain_history_path = cores.domain_history_path + app_history_path = cores.app_history_path + if os.path.exists(domain_history_path): + domain_counts = {} + app_size = 0 + with open(app_history_path,"r",encoding='utf-8',errors='ignore') as f: + lines = f.readlines() + app_size = len(lines) + for line in lines: + self.app_history_list.append(line.replace("\r","").replace("\n","")) + + f.close() + + with open(domain_history_path,"r",encoding='utf-8',errors='ignore') as f: + lines = f.readlines() + cout = 3 + if (app_size>3) and (app_size%3==0): + cout = cout + 1 + for line in lines: + domain = line.replace("\r","").replace("\n","") + domain_count = lines.count(line) + + if domain_count >= cout: + config.filter_no.append(domain) + f.close() + + diff --git a/libs/task/ios_task.py b/libs/task/ios_task.py index d6ebe4f..0814db8 100644 --- a/libs/task/ios_task.py +++ b/libs/task/ios_task.py @@ -1,18 +1,14 @@ # -*- coding: utf-8 -*- # Author: kelvinBen # Github: https://github.com/kelvinBen/AppInfoScanner - - import os import re import shutil import zipfile import binascii import platform -from pathlib import Path import libs.core as cores from queue import Queue -from libs.core.parses import ParsesThreads class iOSTask(object): @@ -22,7 +18,8 @@ class iOSTask(object): self.no_resource = no_resource self.file_queue = Queue() - self.shell_flag=False + self.shell_flag = False + self.file_identifier= [] def start(self): file_path = self.path @@ -33,10 +30,12 @@ class iOSTask(object): self.file_queue.put(file_path) else: raise Exception("Retrieval of this file type is not supported. Select IPA file or Mach-o file.") - return {"shell_flag":self.shell_flag,"file_queue":self.file_queue,"comp_list":[],"packagename":None} + return {"shell_flag":self.shell_flag,"file_queue":self.file_queue,"comp_list":[],"packagename":None,"file_identifier":self.file_identifier} def __get_file_header__(self,file_path): hex_hand = 0x0 + macho_name = os.path.split(file_path)[-1] + self.file_identifier.append(macho_name) with open(file_path,"rb") as macho_file: macho_file.seek(hex_hand,0) magic = binascii.hexlify(macho_file.read(4)).decode().upper() @@ -56,7 +55,6 @@ class iOSTask(object): macho_file.seek(hex_hand,0) encryption_info_command = binascii.hexlify(macho_file.read(24)).decode() cryptid = encryption_info_command[-8:len(encryption_info_command)] - print(cryptid) if cryptid == "01000000": self.shell_flag = True break @@ -78,7 +76,6 @@ class iOSTask(object): else: if self.elf_file_name == dir_file: self.__get_file_header__(dir_file_path) - print(self.shell_flag) self.file_queue.put(dir_file_path) continue if self.no_resource: diff --git a/libs/task/net_task.py b/libs/task/net_task.py new file mode 100644 index 0000000..34972ef --- /dev/null +++ b/libs/task/net_task.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +# Author: kelvinBen +# Github: https://github.com/kelvinBen/AppInfoScanner + +import re +import xlwt +import socket +from queue import Queue +import libs.core as cores +from libs.core.net import NetThreads + +import requests +class NetTask(object): + value_list = [] + domain_list=[] + + def __init__(self,result_dict,app_history_list,file_identifier,threads): + self.result_dict = result_dict + self.app_history_list = app_history_list + self.file_identifier = file_identifier + self.domain_queue = Queue() + self.threads = threads + self.thread_list = [] + + def start(self): + xls_result_path = cores.xls_result_path + workbook = xlwt.Workbook(encoding = 'utf-8') + worksheet = self.__creating_excel_header__(workbook) + self.__start_threads__(worksheet) + self.__write_result_to_txt__() + + for thread in self.thread_list: + thread.join() + + + workbook.save(xls_result_path) + + def __creating_excel_header__(self,workbook): + worksheet = workbook.add_sheet("Result",cell_overwrite_ok=True) + worksheet.write(0,0, label = "Number") + worksheet.write(0,1, label = "IP/URL") + worksheet.write(0,2, label = "Domain") + worksheet.write(0,3, label = "Status") + worksheet.write(0,4, label = "IP") + worksheet.write(0,5, label = "Server") + worksheet.write(0,6, label = "Title") + worksheet.write(0,7, label = "CDN") + worksheet.write(0,8, label = "Finger") + return worksheet + + def __write_result_to_txt__(self): + txt_result_path = cores.txt_result_path + append_file_flag = True + + with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f: + for key,value in self.result_dict.items(): + f.write(key+"\r") + for result in value: + if result in self.value_list: + continue + + # 100个文件标识 + for file in self.file_identifier: + if not(file in self.app_history_list) and ("http://" in result or "https://" in result): + + # print(self.file_identifier,self.app_history_list,not(self.file_identifier[0] in self.app_history_list)) + # if not(self.file_identifier in self.app_history_list) and ("http://" in result or "https://" in result): + domain = result.replace("https://","").replace("http://","") + if "/" in domain: + domain = domain[:domain.index("/")] + + self.domain_queue.put({"domain":domain,"url_ip":result}) + + print(domain,self.domain_list,not(domain in self.domain_list)) + if not(domain in self.domain_list): + self.domain_list.append(domain) + self.__write_content_in_file__(cores.domain_history_path,domain) + if append_file_flag: + for identifier in self.file_identifier: + if self.file_identifier in self.app_history_list: + continue + self.__write_content_in_file__(cores.app_history_path,identifier) + append_file_flag = False + self.value_list.append(result) + f.write("\t"+result+"\r") + f.close() + + def __start_threads__(self,worksheet): + for threadID in range(0,self.threads) : + name = "Thread - " + str(threadID) + thread = NetThreads(threadID,name,self.domain_queue,worksheet) + thread.start() + self.thread_list.append(thread) + + def __write_content_in_file__(self,file_path,content): + with open(file_path,"a+",encoding='utf-8',errors='ignore') as f: + f.write(content+"\r") + f.close() + + +def __get_request_result__(url): + result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""} + cdn = "" + try: + rsp = requests.get(url, timeout=5,stream=True) + status_code = rsp.status_code + result["status"] = status_code + headers = rsp.headers + if "Server" in headers: + result["server"] = headers['Server'] + if "Cookie" in headers: + result["cookie"] = headers['Cookie'] + if "X-Via" in headers: + cdn = cdn + headers['X-Via'] + if "Via" in headers: + cdn = cdn + headers['Via'] + result["cdn"] = cdn + sock = rsp.raw._connection.sock + if sock: + des_ip = sock.getpeername()[0] + sou_ip = sock.getsockname()[0] + if des_ip: + result["des_ip"] = des_ip + if sou_ip: + result["sou_ip"] = sou_ip + html = rsp.text + title = re.findall('(.+)',html) + result["title"] = title + return result + except requests.exceptions.InvalidURL as e: + return "error" + except requests.exceptions.ConnectionError as e1: + return "timeout" + +# print(__get_request_result__("http://download.sxzwfw.gov.cn/getMerchantSign")) \ No newline at end of file diff --git a/libs/task/web_task.py b/libs/task/web_task.py index 39da02a..5759a91 100644 --- a/libs/task/web_task.py +++ b/libs/task/web_task.py @@ -1,16 +1,9 @@ # -*- coding: utf-8 -*- # Author: kelvinBen # Github: https://github.com/kelvinBen/AppInfoScanner - - import os -import re import config -import threading from queue import Queue -import libs.core as cores -from libs.core.parses import ParsesThreads - class WebTask(object): thread_list =[] @@ -44,24 +37,4 @@ class WebTask(object): else: if len(dir_file.split("."))>1: if dir_file.split(".")[-1] in file_suffix: - self.file_queue.put(dir_file_path) - - # def __print__(self): - # print("=========The result set for the static scan is shown below:===============") - # with open(cores.result_path,"a+") as f: - # for key,value in self.result_dict.items(): - # f.write(key+"\r") - # for result in value: - # if result in self.value_list: - # continue - # self.value_list.append(result) - # print(result) - # f.write("\t"+result+"\r") - # print("For more information about the search, see: %s" %(cores.result_path)) - - # def __start_threads(self): - # for threadID in range(1,self.threads) : - # name = "Thread - " + str(threadID) - # thread = ParsesThreads(threadID,name,self.file_queue,self.all,self.result_dict) - # thread.start() - # self.thread_list.append(thread) \ No newline at end of file + self.file_queue.put(dir_file_path) \ No newline at end of file diff --git a/update.md b/update.md index a809184..dc2738a 100644 --- a/update.md +++ b/update.md @@ -1,3 +1,11 @@ +### V1.0.6 +- 新增AI智能分析快速过滤第三方URL地址 +- 新增Domain\Title\CDN\Finger识别功能 +- 优化Excel文件内容输出格式 +- 优化URL和IP筛选规则 +- 优化全局日志输出 +- 修复APK下多DEX文件搜索结果不准确的问题 + ### V1.0.5 - 新增对DOM、SAX、DOM4J、JDOM等XML解析组件的识别 - 新增反编译失败后提交issues入口