From e2456032c0724428f0ca33eeb121a1a5c7a712b4 Mon Sep 17 00:00:00 2001 From: kelvinBen <754147491@qq.com> Date: Tue, 3 Nov 2020 09:11:51 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E4=B8=AD=E5=BF=83=EF=BC=8C=E5=B0=86=E5=88=86=E6=95=A3?= =?UTF-8?q?=E7=9A=84=E5=85=A5=E5=8F=A3=E6=95=B4=E5=90=88=E4=B8=BA=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=202.=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E8=AF=86=E5=88=AB=E5=88=B0?= =?UTF-8?q?=E6=9C=89=E5=A3=B3=E5=90=8E=EF=BC=8C=E5=81=9C=E6=AD=A2=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=90=8E=E7=BB=A7=E9=80=BB=E8=BE=91=203.=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0ipa=E5=A3=B3=E8=AF=86=E5=88=AB=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=EF=BC=8C=E5=B0=86=E8=83=BD=E5=A4=9F=E6=9B=B4=E5=A5=BD=E7=9A=84?= =?UTF-8?q?=E5=AF=B9=E5=A3=B3=E8=BF=9B=E8=A1=8C=E8=AF=86=E5=88=AB=204.?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8D=95=E7=8B=AC=E7=9A=84macho=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E6=89=AB=E6=8F=8F=E5=8A=9F=E8=83=BD=205.=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=B0=86=E7=BB=93=E6=9E=9Curl=E5=92=8Cip=E5=9C=B0?= =?UTF-8?q?=E5=9D=80=E5=8D=95=E7=8B=AC=E8=BE=93=E5=87=BA=E5=88=B0excel=206?= =?UTF-8?q?.=E4=BF=AE=E5=A4=8Dipa=E5=8C=85=E4=B8=AD=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E4=B8=AD=E6=96=87=E8=B7=AF=E5=BE=84=E4=B8=8D=E8=83=BD=E8=AF=86?= =?UTF-8?q?=E5=88=ABmacho=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 27 +++--- app.py | 35 ++++--- config.py | 2 + libs/core/__init__.py | 40 ++++---- libs/core/parses.py | 86 +++++++++-------- libs/task/android_task.py | 188 ++++++++++++++------------------------ libs/task/base_task.py | 119 ++++++++++++++++++++++++ libs/task/ios_task.py | 135 +++++++++++++++++---------- requirements.txt | 3 +- update.md | 17 ++++ 10 files changed, 385 insertions(+), 267 deletions(-) create mode 100644 libs/task/base_task.py diff --git a/README.md b/README.md index 7f269dc..54e0b19 100644 --- a/README.md +++ b/README.md @@ -26,22 +26,25 @@ ### 目录说明 +``` AppInfoScanner |-- libs 程序的核心代码 |-- core |-- parses.py 用于解析文件中的静态信息 |-- task - |-- android_task.py 用于处理Android相关的文件 - |-- ios_task.py 用于处理iOS相关的文件 - |-- web_task.py 用于处理Web相关的文件,比如网页右键源代码、H5相关的静态信息 - |-- tools 程序需要依赖的工具 - |-- apktool.jar 用于反编译apk文件 - |-- baksmali.jar 用于反编译dex文件 - |-- strings.exe 用于windows 32下获取iPA的字符串信息 - |-- strings64.exe 用于windows 64的系统获取iPA的字符串信息 - |-- app.py 主运行程序 - |-- config.py 用于自定义相关规则 - |-- readme.md 程序使用说明 + |-- base_task.py 统一任务调度 + |-- android_task.py 用于处理Android相关的文件 +​ |-- ios_task.py 用于处理iOS相关的文件 +​ |-- web_task.py 用于处理Web相关的文件,比如网页右键源代码、H5相关的静态信息 +​ |-- tools 程序需要依赖的工具 +​ |-- apktool.jar 用于反编译apk文件 +​ |-- baksmali.jar 用于反编译dex文件 +​ |-- strings.exe 用于windows 32下获取iPA的字符串信息 +​ |-- strings64.exe 用于windows 64的系统获取iPA的字符串信息 +​ |-- app.py 主运行程序 +​ |-- config.py 用于自定义相关规则 +​ |-- readme.md 程序使用说明 +``` ### Android 相关操作说明 @@ -91,7 +94,7 @@ python3 app.py android -i -a ``` python3 app.py android -i -t 10 -``` +``` ### iOS 相关操作说明 diff --git a/app.py b/app.py index 5f89be4..1047453 100644 --- a/app.py +++ b/app.py @@ -7,9 +7,7 @@ import click from libs.core import Bootstrapper -from libs.task.android_task import AndroidTask -from libs.task.ios_task import iOSTask -from libs.task.web_task import WebTask +from libs.task.base_task import BaseTask @click.group(help="Python script for automatically retrieving key information in app.") def cli(): @@ -17,60 +15,59 @@ def cli(): # 创建Android任务 @cli.command(help="Get the key information of Android system.") -@click.option("-i", "--input", required=True, type=str, help="Input APK file or DEX directory.") +@click.option("-i", "--inputs", required=True, type=str, help="Input APK file or DEX directory.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") @click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.") @click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.") @click.option("-p", '--package',required=False,type=str,default="",help="Specifies the retrieval package name.") -@click.option("-a", '--all',is_flag=True, default=False,help="Output all strings.") +@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") -def android(input: str, rules: str, net_sniffer: bool,no_resource:bool,package:str,all:bool,threads:int) -> None: +def android(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,package:str,all_str:bool,threads:int) -> None: try: # 初始化全局对象 bootstrapper = Bootstrapper(__file__) bootstrapper.init() - task = AndroidTask(input, rules, net_sniffer,no_resource,package,all,threads) - # 让内层代码直接抛出异常,不做rollback。 - task.start() + BaseTask("Android", inputs, rules, net_sniffer, no_resource, package, all_str, threads).start() except Exception as e: raise e @cli.command(help="Get the key information of iOS system.") -@click.option("-i", "--input", required=True, type=str, help="Input IPA file or ELF file.") +@click.option("-i", "--inputs", required=True, type=str, help="Input IPA file or ELF file.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") @click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.") @click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.") -@click.option("-a", '--all',is_flag=True, default=False,help="Output all strings.") +@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") -def ios(input: str, rules: str, net_sniffer: bool,no_resource:bool,all:bool,threads:int) -> None: +def ios(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,all_str:bool,threads:int) -> None: try: # 初始化全局对象 bootstrapper = Bootstrapper(__file__) bootstrapper.init() - task = iOSTask(input, rules, net_sniffer,no_resource,all,threads) - # 让内层代码直接抛出异常,不做rollback。 - task.start() + BaseTask("iOS", inputs, rules, net_sniffer, no_resource, all_str, threads).start() + except Exception as e: raise e @cli.command(help="Get the key information of Web system.") -@click.option("-i", "--input", required=True, type=str, help="Input WebSite dir.") +@click.option("-i", "--inputs", required=True, type=str, help="Input WebSite dir.") @click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.") -@click.option("-a", '--all',is_flag=True, default=False,help="Output all strings.") +@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.") @click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default") -def web(input: str, rules: str, all:bool,threads:int) -> None: +def web(inputs: str, rules: str, all_str:bool,threads:int) -> None: try: # 初始化全局对象 bootstrapper = Bootstrapper(__file__) bootstrapper.init() + # BaseTask("Web", inputs, rules,all_str, threads).start() + task = WebTask(input, rules,all,threads) - # 让内层代码直接抛出异常,不做rollback。 task.start() + except Exception as e: raise e diff --git a/config.py b/config.py index 8a97223..f298edb 100644 --- a/config.py +++ b/config.py @@ -28,6 +28,8 @@ filter_components = [ # 2. IPv4的ip地址 # 3. URI地址 filter_strs =[ + r'^http://.*', + r'^https://.*', r'.*(http://.*)', r'.*(https://.*)', r'.*((?:[0-9]{1,3}\.){3}[0-9]{1,3}).*', diff --git a/libs/core/__init__.py b/libs/core/__init__.py index 37d84eb..fe119ec 100644 --- a/libs/core/__init__.py +++ b/libs/core/__init__.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- # Author: kelvinBen # Github: https://github.com/kelvinBen/AppInfoScanner - -import platform import os +import time import shutil +import platform + # smali 所在路径 smali_path = "" @@ -30,49 +31,42 @@ class Bootstrapper(object): global os_type global output_path global script_root_dir - global result_path + global txt_result_path + global xls_result_path global strings_path + create_time = time.strftime("%Y%m%d%H%M%S", time.localtime()) + script_root_dir = os.path.dirname(os.path.abspath(path)) tools_dir = os.path.join(script_root_dir,"tools") - + if platform.system() == "Windows": machine2bits = {'AMD64':64, 'x86_64': 64, 'i386': 32, 'x86': 32} machine2bits.get(platform.machine()) if platform.machine() == 'i386' or platform.machine() == 'x86': - strings_path = os.path.join(script_root_dir,"strings.exe") + strings_path = os.path.join(tools_dir,"strings.exe") else: - strings_path = os.path.join(script_root_dir,"strings64.exe") + strings_path = os.path.join(tools_dir,"strings64.exe") else: strings_path ="strings" - # os_type = "win" - # smali_str = "smali.bat" - # back_smali_str = "backsmali.bat" - # apktool_path_str = "apktool.bat" - # elif platform.system() == "Linux": - # os_type = "lin" - # smali_str = "smali" - # back_smali_str = "backsmali" - # apktool_path_str = "apktool" - # else: - # os_type = "mac" - # smali_str = "smali" - - # smali_path = os.path.join(tools_dir,str(os_type) + os.sep + smali_str) backsmali_path = os.path.join(tools_dir,"baksmali.jar") apktool_path = os.path.join(tools_dir, "apktool.jar") output_path = os.path.join(script_root_dir,"out") - result_path = os.path.join(script_root_dir,"result.txt") + txt_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".txt") + xls_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".xls") def init(self): if os.path.exists(output_path): shutil.rmtree(output_path) os.makedirs(output_path) - if os.path.exists(result_path): - os.remove(result_path) + if os.path.exists(txt_result_path): + os.remove(txt_result_path) + + if os.path.exists(xls_result_path): + os.remove(xls_result_path) diff --git a/libs/core/parses.py b/libs/core/parses.py index 9e86cb1..d2c0326 100644 --- a/libs/core/parses.py +++ b/libs/core/parses.py @@ -2,16 +2,15 @@ # Author: kelvinBen # Github: https://github.com/kelvinBen/AppInfoScanner - -import threading -import config import re import os +import config +import threading import libs.core as cores class ParsesThreads(threading.Thread): - def __init__(self,threadID,name,file_queue,all,result_dict): + def __init__(self,threadID,name,file_queue,all,result_dict,types): threading.Thread.__init__(self) self.file_queue = file_queue self.name = name @@ -19,55 +18,54 @@ class ParsesThreads(threading.Thread): self.result_list = [] self.all = all self.result_dict=result_dict + self.types = types - def __regular_parse__(self,threadLock): + def __regular_parse__(self): while True: - try: - file_path = self.file_queue.get(timeout = 5) - scan_str = ("Scan file : %s" % file_path) - print(scan_str) - - try: - os.path.basename(file_path).split(".")[1] - except Exception as e: - self.__get_string__(file_path,threadLock) - continue - self.__file_parse__(file_path,threadLock) - - result_set = set(self.result_list) - if len(result_set) !=0: - self.result_dict[file_path] = result_set - - if self.file_queue.empty(): - break - except Exception as e: + if self.file_queue.empty(): break + + file_path = self.file_queue.get(timeout = 5) + scan_str = ("Scan file : %s" % file_path) + print(scan_str) + + if self.types == "iOS": + self.__get_string_by_iOS__(file_path) + else: + self.__get_string_by_file__(file_path) + + result_set = set(self.result_list) + if len(result_set) !=0: + self.result_dict[file_path] = result_set + + def __get_string_by_iOS__(self,file_path): + output_path = cores.output_path + strings_path = cores.strings_path + temp = os.path.join(output_path,"temp.txt") + cmd_str = ("%s %s > %s") % (strings_path,file_path,temp) + if os.system(cmd_str) == 0: + with open(temp,"r",encoding='utf-8',errors='ignore') as f: + lines = f.readlines() + for line in lines: + self.__parse_string__(line) - def __file_parse__(self,file_path,threadLock): - with open(file_path,"r",encoding="utf8") as file : - file_content = file.read() + def __get_string_by_file__(self,file_path): + with open(file_path,"r",encoding="utf8",errors='ignore') as f : + file_content = f.read() # 获取到所有的字符串 pattern = re.compile(r'\"(.*?)\"') results = pattern.findall(file_content) # 遍历所有的字符串 for result in set(results): - self.__parse_string__(result,threadLock) - - def __get_string__(self,dir_file_path,threadLock): - temp = os.path.join(cores.output_path,"temp.txt") - cmd_str = ("%s %s > %s") % (cores.strings_path,dir_file_path,temp) - if os.system(cmd_str) == 0: - with open(temp,"r") as f: - lines = f.readlines() - for line in lines: - self.__parse_string__(line,threadLock) - - def __parse_string__(self,result,threadLock): + self.__parse_string__(result) + + def __parse_string__(self,result): # 通过正则筛选需要过滤的字符串 for filter_str in config.filter_strs: filter_str_pat = re.compile(filter_str) filter_resl = filter_str_pat.findall(result) + # print(result,filter_resl) # 过滤掉未搜索到的内容 if len(filter_resl)!=0: # 提取第一个结果 @@ -76,9 +74,9 @@ class ParsesThreads(threading.Thread): if self.__filter__(resl_str) == 0: continue - threadLock.acquire() - self.result_list.append(filter_resl[0]) - threadLock.release() + self.threadLock.acquire() + self.result_list.append(resl_str) + self.threadLock.release() continue def __filter__(self,resl_str): @@ -105,5 +103,5 @@ class ParsesThreads(threading.Thread): return return_flag def run(self): - threadLock = threading.Lock() - self.__regular_parse__(threadLock) + self.threadLock = threading.Lock() + self.__regular_parse__() diff --git a/libs/task/android_task.py b/libs/task/android_task.py index ad6015a..5bb536e 100644 --- a/libs/task/android_task.py +++ b/libs/task/android_task.py @@ -12,155 +12,105 @@ import libs.core as cores from libs.core.parses import ParsesThreads class AndroidTask(object): - comp_list =[] - thread_list =[] - result_dict = {} - value_list = [] - def __init__(self, input, rules, net_sniffer,no_resource,package,all,threads): - self.net_sniffer = net_sniffer - self.path = input - if rules: - config.filter_strs.append(r'.*'+str(rules)+'.*') + def __init__(self,path,no_resource,package): + self.path = path self.no_resource = no_resource self.package = package - self.all = all - self.threads = threads + self.file_queue = Queue() - self.shell_falg=False + self.shell_flag=False self.packagename="" + self.comp_list=[] - def start(self): # 检查java环境是否存在 if os.system("java -version") !=0 : raise Exception("Please install the Java environment!") - # 根据不同的文件后缀进行文件解析 - if os.path.isfile(self.path): - suffix_name = self.path.split(".")[-1] - if suffix_name == "apk": - self.__decode_apk__(self.path) - elif suffix_name == "dex": - self.__decode_dex__(self.path) - else: - # 抛出异常 - raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.") - else: - self.__get_file_type__(self.path) - self.__start_threads() + input_file_path = self.path - for thread in self.thread_list: - thread.join() - - self.__print__() + if os.path.isdir(input_file_path): + self.__decode_dir__(input_file_path) + else: + if self.__decode_file__(input_file_path) == "error": + raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.") - # if self.net_sniffer: - # self.__start_net__() + return {"comp_list":self.comp_list,"shell_flag":self.shell_flag,"file_queue":self.file_queue,"packagename":self.packagename} + + def __decode_file__(self,file_path): + apktool_path = str(cores.apktool_path) + output_path = str(cores.output_path) + backsmali_path = str(cores.backsmali_path) + suffix_name = file_path.split(".")[-1] + if suffix_name == "apk": + self.__decode_apk__(file_path,apktool_path,output_path) + elif suffix_name == "dex": + self.__decode_dex__(file_path,backsmali_path,output_path) + else: + return "error" + + def __decode_dir__(self,root_dir): + dir_or_files = os.listdir(root_dir) + for dir_or_file in dir_or_files: + dir_or_file_path = os.path.join(root_dir,dir_or_file) + if os.path.isdir(dir_or_file_path): + self.__decode_dir__(dir_or_file_path) + else: + if self.__decode_file__(dir_or_file_path) == "error": + continue # 分解apk - def __decode_apk__(self,path): - if self.no_resource: - self.__decode_dex__(path) + def __decode_apk__(self,file_path,apktool_path,output_path): + cmd_str = ("java -jar %s d -f %s -o %s --only-main-classe") % (apktool_path,str(file_path),output_path) + if os.system(cmd_str) == 0: + self.__shell_test__(output_path) + self.__scanner_file_by_apktool__(output_path) else: - cmd_str = ("java -jar %s d -f %s -o %s --only-main-classe") % (cores.apktool_path,path,cores.output_path) - if os.system(cmd_str) == 0: - self.__scanner_file_by_apktool__(cores.output_path) - else: - print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues") - raise Exception("Decompilation failed.") + print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues") + raise Exception(file_path + ", Decompilation failed.") # 分解dex - def __decode_dex__(self,path): - cmd_str = ("java -jar %s d %s") % (cores.backsmali_path,path) + def __decode_dex__(self,file_path,backsmali_path,output_path): + cmd_str = ("java -jar %s d %s") % (backsmali_path,str(file_path)) if os.system(cmd_str) == 0: - self.__get_scanner_file__(cores.output_path,"smali") + self.__get_scanner_file__(output_path) else: print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues") - raise Exception("Decompilation failed.") + raise Exception(file_path + ", Decompilation failed.") # 初始化检测文件信息 - def __scanner_file_by_apktool__(self,output): - - # shell检测 - self.__shell_test__(output) - - scanner_dir_list = ["smali","assets"] - scanner_file_suffix = ["smali","js","xml"] + def __scanner_file_by_apktool__(self,output_path): + if self.no_resource: + scanner_dir_lists = ["smali"] + scanner_file_suffixs = ["smali"] + else: + scanner_dir_lists = ["smali","assets"] + scanner_file_suffixs = ["smali","js","xml"] - for dir in scanner_dir_list: - scanner_dir = os.path.join(output,dir) + for scanner_dir_list in scanner_dir_lists: + scanner_dir = os.path.join(output_path,scanner_dir_list) if os.path.exists(scanner_dir): - self.__get_scanner_file__(scanner_dir,scanner_file_suffix) + self.__get_scanner_file__(scanner_dir,scanner_file_suffixs) - def __get_scanner_file__(self,scanner_dir,file_suffix): + def __get_scanner_file__(self,scanner_dir,scanner_file_suffixs=["smali"]): dir_or_files = os.listdir(scanner_dir) - for dir_file in dir_or_files: - dir_file_path = os.path.join(scanner_dir,dir_file) + for dir_or_file in dir_or_files: + dir_file_path = os.path.join(scanner_dir,dir_or_file) + if os.path.isdir(dir_file_path): - self.__get_scanner_file__(dir_file_path,file_suffix) + self.__get_scanner_file__(dir_file_path,scanner_file_suffixs) else: - if "." not in dir_file: - continue - if len(dir_file.split("."))>1: - if dir_file.split(".")[-1] in file_suffix: - self.file_queue.put(dir_file_path) - for component in config.filter_components: - comp = component.replace(".","/") - if( comp in dir_file_path): - if(component not in self.comp_list): - self.comp_list.append(component) - - - def __get_file_type__(self,root_path): - dir_or_files = os.listdir(root_path) - for dir_file in dir_or_files: - dir_file_path = os.path.join(root_path,dir_file) - if os.path.isdir(dir_file_path): - self.__get_file_type__(dir_file_path) - else: - suffix_name = dir_file.split(".")[-1] - - if suffix_name == "apk": - self.__decode_apk__(dir_file) - elif suffix_name == "dex": - self.__decode_dex__(dir_file) - else: + if ("." not in dir_or_file) or (len(dir_or_file.split(".")) < 1) or (dir_or_file.split(".")[-1] not in scanner_file_suffixs): continue - - def __print__(self): - if self.packagename: - print("========= The package name of this APP is: ===============") - print(self.packagename) - - if len(self.comp_list) != 0: - print("========= Component information is as follows :===============") - for json in self.comp_list: - print(json) - - print("=========The result set for the static scan is shown below:===============") - with open(cores.result_path,"a+") as f: - for key,value in self.result_dict.items(): - f.write(key+"\r") - for result in value: - if result in self.value_list: - continue - self.value_list.append(result) - print(result) - f.write("\t"+result+"\r") - print("For more information about the search, see: %s" %(cores.result_path)) - - if self.shell_falg: - print('\033[1;33mWarning: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!') - - def __start_threads(self): - for threadID in range(1,self.threads) : - name = "Thread - " + str(threadID) - thread = ParsesThreads(threadID,name,self.file_queue,self.all,self.result_dict) - thread.start() - self.thread_list.append(thread) + self.file_queue.put(dir_file_path) + for component in config.filter_components: + comp = component.replace(".","/") + if(comp in dir_file_path): + if(component not in self.comp_list): + self.comp_list.append(component) def __shell_test__(self,output): am_path = os.path.join(output,"AndroidManifest.xml") @@ -171,10 +121,10 @@ class AndroidTask(object): am_package= re.compile(r'=1: - self.packagename = apackage + self.packagename = apackage[0] am_name = re.compile(r'') aname = am_name.findall(am_str) if aname and len(aname)>=1: if aname[0] in config.shell_list: - self.shell_falg = True \ No newline at end of file + self.shell_flag = True \ No newline at end of file diff --git a/libs/task/base_task.py b/libs/task/base_task.py new file mode 100644 index 0000000..32fad56 --- /dev/null +++ b/libs/task/base_task.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Author: kelvinBen +# Github: https://github.com/kelvinBen/AppInfoScanner + +# 接收传入的参数信息,根据参数进行平台分发 +import os +import re +import xlwt +import config +import threading +from queue import Queue +import libs.core as cores +from libs.core.parses import ParsesThreads +from libs.task.android_task import AndroidTask +from libs.task.ios_task import iOSTask +from libs.task.web_task import WebTask + +class BaseTask(object): + thread_list =[] + result_dict = {} + value_list = [] + + # 统一初始化入口 + def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10): + self.types = types + self.net_sniffer = net_sniffer + self.path = inputs + if rules: + config.filter_strs.append(r'.*'+str(rules)+'.*') + self.no_resource = no_resource + self.package = package + self.all = all_str + self.threads = threads + self.file_queue = Queue() + + # 统一调度平台 + def start(self): + workbook = xlwt.Workbook(encoding = 'utf-8') + + # 创建excel头 + worksheet = self.__creating_excel_header__(workbook) + + # 任务控制中心 + task_info = self.__tast_control__() + file_queue = task_info["file_queue"] + shell_flag = task_info["shell_flag"] + comp_list = task_info["comp_list"] + packagename = task_info["packagename"] + + if shell_flag: + print('\033[3;31m Error: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!') + return + + # 线程控制中心 + self.__threads_control__(file_queue) + + # 等待线程结束 + for thread in self.thread_list: + thread.join() + + # 结果输出中心 + self.__print_control__(packagename,comp_list,workbook,worksheet) + + def __creating_excel_header__(self,workbook): + worksheet = workbook.add_sheet("扫描信息",cell_overwrite_ok=True) + worksheet.write(0,0, label = "扫描结果") + return worksheet + + def __tast_control__(self): + task_info = {} + # 调用Android 相关处理逻辑 + if self.types == "Android": + task_info = AndroidTask(self.path,self.no_resource,self.package).start() + # 调用iOS 相关处理逻辑 + elif self.types == "iOS": + task_info = iOSTask(self.path,self.no_resource).start() + # 调用Web 相关处理逻辑 + else: + task_info = WebTask.start() + return task_info + + def __threads_control__(self,file_queue): + for threadID in range(1,self.threads): + name = "Thread - " + str(threadID) + thread = ParsesThreads(threadID,name,file_queue,self.all,self.result_dict,self.types) + thread.start() + self.thread_list.append(thread) + + def __print_control__(self,packagename,comp_list,workbook,worksheet): + txt_result_path = cores.txt_result_path + xls_result_path = cores.xls_result_path + + if packagename: + print("========= The package name of this APP is: ===============") + print(packagename) + + if len(comp_list) != 0: + print("========= Component information is as follows :===============") + for json in comp_list: + print(json) + + print("=========The result set for the static scan is shown below:===============") + with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f: + row = 1 + for key,value in self.result_dict.items(): + f.write(key+"\r") + for result in value: + if result in self.value_list: + continue + self.value_list.append(result) + print(result) + worksheet.write(row,0, label = result) + row = row + 1 + f.write("\t"+result+"\r") + print("For more information about the search, see TXT file result: %s" %(txt_result_path)) + print("For more information about the search, see XLS file result: %s" %(xls_result_path)) + workbook.save(xls_result_path) + + diff --git a/libs/task/ios_task.py b/libs/task/ios_task.py index 047aaaf..7b92c27 100644 --- a/libs/task/ios_task.py +++ b/libs/task/ios_task.py @@ -5,45 +5,63 @@ import os import re +import shutil import zipfile +import binascii +import platform +from pathlib import Path import libs.core as cores from queue import Queue from libs.core.parses import ParsesThreads class iOSTask(object): - thread_list =[] - value_list = [] - result_dict = {} - - def __init__(self,input, rules, net_sniffer,no_resource,all,threads): - self.path = input - self.rules = rules - self.net_sniffer = net_sniffer + elf_file_name = "" + def __init__(self,path,no_resource): + self.path = path self.no_resource = no_resource - self.all = all - self.threads = threads - if rules: - config.filter_strs.append(r'.*'+str(rules)+'.*') + self.file_queue = Queue() - self.shell_falg=False + self.shell_flag=False def start(self): - # ipa 文件 - if self.path.split(".")[-1] == 'ipa': - # 对ipa进行解包 + file_path = self.path + if file_path.split(".")[-1] == 'ipa': self.__decode_ipa__(cores.output_path) - - #文件提取 self.__scanner_file_by_ipa__(cores.output_path) - - self.__start_threads() - - for thread in self.thread_list: - thread.join() + elif self.__get_file_header__(file_path): + self.file_queue.put(file_path) + else: + raise Exception("Retrieval of this file type is not supported. Select IPA file or Mach-o file.") + return {"shell_flag":self.shell_flag,"file_queue":self.file_queue,"comp_list":[],"packagename":None} - self.__print__() - + def __get_file_header__(self,file_path): + print("====================") + hex_hand = 0x0 + with open(file_path,"rb") as macho_file: + macho_file.seek(hex_hand,0) + magic = binascii.hexlify(macho_file.read(4)).decode().upper() + macho_magics = ["CFFAEDFE","CEFAEDFE","BEBAFECA","CAFEBABE"] + if magic in macho_magics: + self.__shell_test__(macho_file,hex_hand) + macho_file.close() + return True + macho_file.close() + return False + + + def __shell_test__(self,macho_file,hex_hand): + while True: + magic = binascii.hexlify(macho_file.read(4)).decode().upper() + if magic == "2C000000": + macho_file.seek(hex_hand,0) + encryption_info_command = binascii.hexlify(macho_file.read(24)).decode() + cryptid = encryption_info_command[-8:len(encryption_info_command)] + print(cryptid) + if cryptid == "01000000": + self.shell_flag = True + break + hex_hand = hex_hand + 4 def __scanner_file_by_ipa__(self,output): scanner_file_suffix = ["plist","js","xml","html"] @@ -60,39 +78,58 @@ class iOSTask(object): self.__get_scanner_file__(dir_file_path,file_suffix) else: if self.elf_file_name == dir_file: + self.__get_file_header__(dir_file_path) + print(self.shell_flag) self.file_queue.put(dir_file_path) continue if self.no_resource: dir_file_suffix = dir_file.split(".") if len(dir_file_suffix) > 1: if dir_file_suffix[-1] in file_suffix: + self.__get_file_header__(dir_file_path) self.file_queue.put(dir_file_path) def __decode_ipa__(self,output_path): - zip_files = zipfile.ZipFile(self.path) - for zip_file in zip_files.filelist: - zip_files.extract(zip_file.filename,output_path) + with zipfile.ZipFile(self.path,"r") as zip_files: + zip_file_names = zip_files.namelist() + zip_files.extract(zip_file_names[0],output_path) + + try: + new_zip_file = zip_file_names[0].encode('cp437').decode('utf-8') + except UnicodeEncodeError: + new_zip_file = zip_file_names[0].encode('utf-8').decode('utf-8') + + old_zip_dir = self.__get_parse_dir__(output_path,zip_file_names[0]) + new_zip_dir = self.__get_parse_dir__(output_path,new_zip_file) + os.rename(old_zip_dir,new_zip_dir) + for zip_file in zip_file_names: + + old_ext_path = zip_files.extract(zip_file,output_path) + start = str(old_ext_path).index("Payload") + dir_path = old_ext_path[start:len(old_ext_path)] + old_ext_path = os.path.join(output_path,dir_path) + try: + new_zip_file = zip_file.encode('cp437').decode('utf-8') + except UnicodeEncodeError: + new_zip_file = zip_file.encode('utf-8').decode('utf-8') + + new_ext_path = os.path.join(output_path,new_zip_file) - def __start_threads(self): - for threadID in range(1,self.threads) : - name = "Thread - " + str(threadID) - thread = ParsesThreads(threadID,name,self.file_queue,self.all,self.result_dict) - thread.start() - self.thread_list.append(thread) - + if platform.system() == "Windows": + new_ext_path = new_ext_path.replace("/","\\") + + if not os.path.exists(new_ext_path): + dir_path = os.path.dirname(new_ext_path) + if not os.path.exists(dir_path): + os.makedirs(dir_path) + shutil.move(old_ext_path, new_ext_path) - def __print__(self): - print("=========The result set for the static scan is shown below:===============") - with open(cores.result_path,"a+") as f: - for key,value in self.result_dict.items(): - f.write(key+"\r") - for result in value: - if result in self.value_list: - continue - self.value_list.append(result) - print(result) - f.write("\t"+result+"\r") - print("For more information about the search, see: %s" %(cores.result_path)) - if self.shell_falg: - print('\033[1;33mWarning: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!') \ No newline at end of file + def __get_parse_dir__(self,output_path,file_path): + start = file_path.index("Payload/") + end = file_path.index(".app") + root_dir = file_path[start:end] + if platform.system() == "Windows": + root_dir = root_dir.replace("/","\\") + old_root_dir = os.path.join(output_path,root_dir+".app") + return old_root_dir \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b98f660..b645203 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -click \ No newline at end of file +click +xlwt \ No newline at end of file diff --git a/update.md b/update.md index 3784ee8..dabdedf 100644 --- a/update.md +++ b/update.md @@ -1,9 +1,26 @@ ### V1.0.5 - 新增对DOM、SAX、DOM4J、JDOM等XML解析组件的识别 + - 新增反编译失败后提交issues入口 + +- 新增ipa壳识别功能,将能够更好的对壳进行识别 + +- 新增macho文件扫描功能 + +- 新增结果url和ip地址单独输出到excel + - 优化后缀名获取方式 + +- 优化任务控制中心,将分散的入口整合为一个(web的除外) + +- 化任务处理逻辑,识别到有壳后,停止执行后继逻辑 + - 修复部分DEX格式原因导致无法进行反编译问题 +- 修复ipa包中存在中文路径不能识别macho问题 + + + ### V1.0.4 - 新增对Flutter框架检测支持