diff --git a/app.py b/app.py index 32d5405..97156e1 100644 --- a/app.py +++ b/app.py @@ -17,7 +17,7 @@ def cli(): @cli.command(help="Get the key information of Android system.") @click.option("-i", "--inputs", required=True, type=str, help="Input APK file or DEX directory.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") -@click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.") +@click.option("-s", "--net-sniffer", is_flag=True, default=True, help="Whether to enable network sniffing.") @click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.") @click.option("-p", '--package',required=False,type=str,default="",help="Specifies the retrieval package name.") @click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @@ -36,7 +36,7 @@ def android(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,package: @cli.command(help="Get the key information of iOS system.") @click.option("-i", "--inputs", required=True, type=str, help="Input IPA file or ELF file.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") -@click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.") +@click.option("-s", "--net-sniffer", is_flag=True, default=True, help="Whether to enable network sniffing.") @click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.") @click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") @@ -57,17 +57,15 @@ def ios(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,all_str:bool @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") @click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") -def web(inputs: str, rules: str, all_str:bool,threads:int) -> None: +@click.option("-s", "--net-sniffer", is_flag=True, default=True, help="Whether to enable network sniffing.") +def web(inputs: str, rules: str, all_str:bool,threads:int,net_sniffer) -> None: try: # 初始化全局对象 bootstrapper = Bootstrapper(__file__) bootstrapper.init() - BaseTask("Web", inputs, rules,all_str, threads).start() - - # task = WebTask(input, rules,all,threads) - # task.start() - + BaseTask("Web", inputs, rules,all_str, net_sniffer,threads).start() + except Exception as e: raise e diff --git a/config.py b/config.py index 1fc5bfe..ea39aaa 100644 --- a/config.py +++ b/config.py @@ -29,35 +29,26 @@ filter_components = [ # 3. URI地址,URI不能很好的拼接所以此处忽略 filter_strs =[ r'https://.*|http://.*', - r'.*://([[0-9]{1,3}\.]{3}[0-9]{1,3}).*', + # r'.*://([[0-9]{1,3}\.]{3}[0-9]{1,3}).*', + r'.*://([\d{1,3}\.]{3}\d{1,3}).*' # r'/[a-z0-9A-Z]+/.*' ] # 此处忽略常见的域名等信息 filter_no = [ - # r'.*127.0.0.1', - # r'.*0.0.0.0', - # r'.*localhost', - # r'.*w3.org', - # r'.*apache.org', - # r'.*android.com', - # r'.*weixin.qq.com', - # r'.*jpush.cn', - # r'.*umengcloud.com', - # r'.*umeng.com', - # r'.*baidu.com', - # r'.*apple.com', - # r'.*alibaba.com', - # r'.*qq.com', - # r'.*sohu.com', - # r'.*openssl.org', - # r'.*weibo.com', - # r'.*wechat.com', - # r'.*.amap.com', - # r'.*openxmlformats.org', - # r'.*github.com', - # r'.*w3school.com.cn', - # r'.*google.com' + r'.*127.0.0.1', + r'.*0.0.0.0', + r'.*localhost', + r'.*w3.org', + r'.*apache.org', + r'.*android.com', + r'.*jpush.cn', + r'.*umengcloud.com', + r'.*umeng.com', + r'.*github.com', + r'.*w3school.com.cn', + r'.*apple.com', + r'.*.amap.com', ] # 此处配置壳信息 diff --git a/libs/core/net.py b/libs/core/net.py index 4e903b5..5988797 100644 --- a/libs/core/net.py +++ b/libs/core/net.py @@ -16,11 +16,14 @@ class NetThreads(threading.Thread): def __get_Http_info__(self,threadLock): while True: + if self.domain_queue.empty(): + break domains = self.domain_queue.get(timeout=5) domain = domains["domain"] url_ip = domains["url_ip"] time.sleep(2) result = self.__get_request_result__(url_ip) + print("[+] Processing URL address:"+url_ip) if result != "error": if self.lock.acquire(True): cores.excel_row = cores.excel_row + 1 @@ -33,17 +36,15 @@ class NetThreads(threading.Thread): self.worksheet.write(cores.excel_row, 5, label = result["server"]) self.worksheet.write(cores.excel_row, 6, label = result["title"]) self.worksheet.write(cores.excel_row, 7, label = result["cdn"]) - self.worksheet.write(cores.excel_row, 8, label = "") + # self.worksheet.write(cores.excel_row, 8, label = "") self.lock.release() - if self.domain_queue.empty(): - break + + def __get_request_result__(self,url): result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""} cdn = "" try: - # python3 app.py ios -i C:\Users\Administrator\Desktop\frida-ipa-dump\三晋通\Payload\三晋通.app\三晋通 - # with requests.get(url, timeout=5,stream=True) as rsp: status_code = rsp.status_code result["status"] = status_code @@ -80,7 +81,6 @@ class NetThreads(threading.Thread): except requests.exceptions.ReadTimeout as e2: return "timeout" - def run(self): threadLock = threading.Lock() self.__get_Http_info__(threadLock) diff --git a/libs/core/parses.py b/libs/core/parses.py index 621d044..7bcc37d 100644 --- a/libs/core/parses.py +++ b/libs/core/parses.py @@ -26,7 +26,7 @@ class ParsesThreads(threading.Thread): break file_path = self.file_queue.get(timeout = 5) - scan_str = ("Scan file : %s" % file_path) + scan_str = ("[+] Scan file : %s" % file_path) print(scan_str) if self.types == "iOS": @@ -65,7 +65,7 @@ class ParsesThreads(threading.Thread): for filter_str in config.filter_strs: filter_str_pat = re.compile(filter_str) filter_resl = filter_str_pat.findall(result) - # print(result,filter_resl) + # 过滤掉未搜索到的内容 if len(filter_resl)!=0: # 提取第一个结果 @@ -75,27 +75,18 @@ class ParsesThreads(threading.Thread): continue self.threadLock.acquire() + print("[+] The string searched for matching rule is: %s" % (resl_str)) self.result_list.append(resl_str) self.threadLock.release() continue def __filter__(self,resl_str): return_flag = 1 - print(resl_str) resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","") + if len(resl_str) == 0: return 0 - - # 目前流通的域名中加上协议头最短长度为11位 - if len(resl_str) <= 10: - return 0 - - # 单独处理https或者http开头的字符串 - # http_list =["https","https://","https:","http","http://","https:",] - # for filte in http_list: - # if filte == resl_str: - # return 0 - + for filte in config.filter_no: resl_str = resl_str.replace(filte,"") if len(resl_str) == 0: diff --git a/libs/task/base_task.py b/libs/task/base_task.py index 017417e..03bc0a3 100644 --- a/libs/task/base_task.py +++ b/libs/task/base_task.py @@ -19,7 +19,7 @@ class BaseTask(object): app_history_list=[] # 统一初始化入口 - def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10): + def __init__(self, types="Android", inputs="", rules="", net_sniffer=True, no_resource=False, package="", all_str=False, threads=10): self.types = types self.net_sniffer = net_sniffer self.path = inputs @@ -35,12 +35,12 @@ class BaseTask(object): # 统一调度平台 def start(self): - print("[*] AI决策系统正在分析规则中...") + print("[*] AI is analyzing filtering rules......") # 获取历史记录 self.__history_handle__() - print("[*] 本次的过滤规则为:" , config.filter_no) + print("[*] The filtering rules obtained by AI are as follows: %s" % (config.filter_no) ) # 任务控制中心 task_info = self.__tast_control__() @@ -55,6 +55,7 @@ class BaseTask(object): return # 线程控制中心 + print("[*] ========= Searching for strings that match the rules ===============") self.__threads_control__(file_queue) # 等待线程结束 @@ -92,44 +93,19 @@ class BaseTask(object): # 此处需要hash值或者应用名称, apk文件获取pachage, dex文件获取hash, macho-o获取文件名 if packagename: - print("========= The package name of this APP is: ===============") + print("[*] ========= The package name of this APP is: ===============") print(packagename) if len(comp_list) != 0: - print("========= Component information is as follows :===============") + print("[*] ========= Component information is as follows :===============") for json in comp_list: print(json) - print("=========The result set for the static scan is shown below:===============") - - NetTask(self.result_dict,self.app_history_list,file_identifier,self.threads).start() - # with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f: - # row = 1 - # for key,value in self.result_dict.items(): - # f.write(key+"\r") - - # for result in value: - # if result in self.value_list: - # continue - # if not(file_identifier in self.app_history_list) and ("http://" in result or "https://" in result): - # domain = result.replace("https://","").replace("http://","") - # if "/" in domain: - # domain = domain[:domain.index("/")] - - # if not(domain in self.domain_list): - # self.domain_list.append(domain) - # self.__write_content_in_file__(cores.domain_history_path,domain) - # if append_file_flag: - # for identifier in file_identifier: - # self.__write_content_in_file__(cores.app_history_path,identifier) - # append_file_flag = False - - # self.value_list.append(result) - # worksheet.write(row,0, label = result) - # row = row + 1 - # f.write("\t"+result+"\r") - print("For more information about the search, see TXT file result: %s" %(cores.txt_result_path)) - print("For more information about the search, see XLS file result: %s" %(cores.xls_result_path)) + if self.net_sniffer: + print("[*] ========= Sniffing the URL address of the search ===============") + NetTask(self.result_dict,self.app_history_list,file_identifier,self.threads).start() + print("[*] For more information about the search, see XLS file result: %s" %(cores.xls_result_path)) + print("[*] For more information about the search, see TXT file result: %s" %(cores.txt_result_path)) def __history_handle__(self): domain_history_path = cores.domain_history_path diff --git a/libs/task/net_task.py b/libs/task/net_task.py index 34972ef..55605e3 100644 --- a/libs/task/net_task.py +++ b/libs/task/net_task.py @@ -26,13 +26,14 @@ class NetTask(object): xls_result_path = cores.xls_result_path workbook = xlwt.Workbook(encoding = 'utf-8') worksheet = self.__creating_excel_header__(workbook) - self.__start_threads__(worksheet) + self.__write_result_to_txt__() + self.__start_threads__(worksheet) + for thread in self.thread_list: thread.join() - workbook.save(xls_result_path) def __creating_excel_header__(self,workbook): @@ -45,7 +46,7 @@ class NetTask(object): worksheet.write(0,5, label = "Server") worksheet.write(0,6, label = "Title") worksheet.write(0,7, label = "CDN") - worksheet.write(0,8, label = "Finger") + # worksheet.write(0,8, label = "Finger") return worksheet def __write_result_to_txt__(self): @@ -58,29 +59,33 @@ class NetTask(object): for result in value: if result in self.value_list: continue - - # 100个文件标识 - for file in self.file_identifier: - if not(file in self.app_history_list) and ("http://" in result or "https://" in result): - # print(self.file_identifier,self.app_history_list,not(self.file_identifier[0] in self.app_history_list)) - # if not(self.file_identifier in self.app_history_list) and ("http://" in result or "https://" in result): - domain = result.replace("https://","").replace("http://","") - if "/" in domain: - domain = domain[:domain.index("/")] - - self.domain_queue.put({"domain":domain,"url_ip":result}) + if (("http://" in result) or ("https://" in result)) and ("." in result): + domain = result.replace("https://","").replace("http://","") + if "/" in domain: + domain = domain[:domain.index("/")] + + if "|" in result: + result = result[:result.index("|")] + # 目前流通的域名中加上协议头最短长度为11位 + if len(result) <= 10: + continue + self.domain_queue.put({"domain":domain,"url_ip":result}) + + for identifier in self.file_identifier: + if identifier in self.app_history_list: + if not(domain in self.domain_list): + self.domain_list.append(domain) + self.__write_content_in_file__(cores.domain_history_path,domain) + continue - print(domain,self.domain_list,not(domain in self.domain_list)) if not(domain in self.domain_list): self.domain_list.append(domain) self.__write_content_in_file__(cores.domain_history_path,domain) + if append_file_flag: - for identifier in self.file_identifier: - if self.file_identifier in self.app_history_list: - continue - self.__write_content_in_file__(cores.app_history_path,identifier) - append_file_flag = False + self.__write_content_in_file__(cores.app_history_path,identifier) + append_file_flag = False self.value_list.append(result) f.write("\t"+result+"\r") f.close() diff --git a/libs/task/web_task.py b/libs/task/web_task.py index 5759a91..755171d 100644 --- a/libs/task/web_task.py +++ b/libs/task/web_task.py @@ -37,4 +37,7 @@ class WebTask(object): else: if len(dir_file.split("."))>1: if dir_file.split(".")[-1] in file_suffix: + with open(file_path,'rb') as f: + dex_md5 = str(hashlib.md5().update(f.read()).hexdigest()).upper() + self.file_identifier.append(dex_md5) self.file_queue.put(dir_file_path) \ No newline at end of file diff --git a/update.md b/update.md index dc2738a..bb431ed 100644 --- a/update.md +++ b/update.md @@ -1,6 +1,6 @@ ### V1.0.6 - 新增AI智能分析快速过滤第三方URL地址 -- 新增Domain\Title\CDN\Finger识别功能 +- 新增Domain\Title\CDN识别功能 - 优化Excel文件内容输出格式 - 优化URL和IP筛选规则 - 优化全局日志输出