1.优化任务控制中心,将分散的入口整合为一个

2.优化任务处理逻辑,识别到有壳后,停止执行后继逻辑
3.增加ipa壳识别功能,将能够更好的对壳进行识别
4.增加单独的macho文件扫描功能
5.增加将结果url和ip地址单独输出到excel
6.修复ipa包中存在中文路径不能识别macho问题
v1.0.5
kelvinBen 4 years ago committed by Your Name
parent 2b8fa22228
commit e2456032c0
  1. 27
      README.md
  2. 35
      app.py
  3. 2
      config.py
  4. 40
      libs/core/__init__.py
  5. 86
      libs/core/parses.py
  6. 188
      libs/task/android_task.py
  7. 119
      libs/task/base_task.py
  8. 135
      libs/task/ios_task.py
  9. 3
      requirements.txt
  10. 17
      update.md

@ -26,22 +26,25 @@
### 目录说明
```
AppInfoScanner
|-- libs 程序的核心代码
|-- core
|-- parses.py 用于解析文件中的静态信息
|-- task
|-- android_task.py 用于处理Android相关的文件
|-- ios_task.py 用于处理iOS相关的文件
|-- web_task.py 用于处理Web相关的文件,比如网页右键源代码、H5相关的静态信息
|-- tools 程序需要依赖的工具
|-- apktool.jar 用于反编译apk文件
|-- baksmali.jar 用于反编译dex文件
|-- strings.exe 用于windows 32下获取iPA的字符串信息
|-- strings64.exe 用于windows 64的系统获取iPA的字符串信息
|-- app.py 主运行程序
|-- config.py 用于自定义相关规则
|-- readme.md 程序使用说明
|-- base_task.py 统一任务调度
|-- android_task.py 用于处理Android相关的文件
|-- ios_task.py 用于处理iOS相关的文件
|-- web_task.py 用于处理Web相关的文件,比如网页右键源代码、H5相关的静态信息
|-- tools 程序需要依赖的工具
|-- apktool.jar 用于反编译apk文件
|-- baksmali.jar 用于反编译dex文件
|-- strings.exe 用于windows 32下获取iPA的字符串信息
|-- strings64.exe 用于windows 64的系统获取iPA的字符串信息
|-- app.py 主运行程序
|-- config.py 用于自定义相关规则
|-- readme.md 程序使用说明
```
### Android 相关操作说明
@ -91,7 +94,7 @@ python3 app.py android -i <Your apk or dex directory> -a
```
python3 app.py android -i <Your apk or dex directory> -t 10
```
```
### iOS 相关操作说明

@ -7,9 +7,7 @@
import click
from libs.core import Bootstrapper
from libs.task.android_task import AndroidTask
from libs.task.ios_task import iOSTask
from libs.task.web_task import WebTask
from libs.task.base_task import BaseTask
@click.group(help="Python script for automatically retrieving key information in app.")
def cli():
@ -17,60 +15,59 @@ def cli():
# 创建Android任务
@cli.command(help="Get the key information of Android system.")
@click.option("-i", "--input", required=True, type=str, help="Input APK file or DEX directory.")
@click.option("-i", "--inputs", required=True, type=str, help="Input APK file or DEX directory.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.")
@click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.")
@click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.")
@click.option("-p", '--package',required=False,type=str,default="",help="Specifies the retrieval package name.")
@click.option("-a", '--all',is_flag=True, default=False,help="Output all strings.")
@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.")
@click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default")
def android(input: str, rules: str, net_sniffer: bool,no_resource:bool,package:str,all:bool,threads:int) -> None:
def android(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,package:str,all_str:bool,threads:int) -> None:
try:
# 初始化全局对象
bootstrapper = Bootstrapper(__file__)
bootstrapper.init()
task = AndroidTask(input, rules, net_sniffer,no_resource,package,all,threads)
# 让内层代码直接抛出异常,不做rollback。
task.start()
BaseTask("Android", inputs, rules, net_sniffer, no_resource, package, all_str, threads).start()
except Exception as e:
raise e
@cli.command(help="Get the key information of iOS system.")
@click.option("-i", "--input", required=True, type=str, help="Input IPA file or ELF file.")
@click.option("-i", "--inputs", required=True, type=str, help="Input IPA file or ELF file.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.")
@click.option("-s", "--net-sniffer", is_flag=True, default=False, help="Whether to enable network sniffing.")
@click.option("-n", '--no-resource', is_flag=True, default=False,help="Ignore resource files.")
@click.option("-a", '--all',is_flag=True, default=False,help="Output all strings.")
@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.")
@click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default")
def ios(input: str, rules: str, net_sniffer: bool,no_resource:bool,all:bool,threads:int) -> None:
def ios(inputs: str, rules: str, net_sniffer: bool,no_resource:bool,all_str:bool,threads:int) -> None:
try:
# 初始化全局对象
bootstrapper = Bootstrapper(__file__)
bootstrapper.init()
task = iOSTask(input, rules, net_sniffer,no_resource,all,threads)
# 让内层代码直接抛出异常,不做rollback。
task.start()
BaseTask("iOS", inputs, rules, net_sniffer, no_resource, all_str, threads).start()
except Exception as e:
raise e
@cli.command(help="Get the key information of Web system.")
@click.option("-i", "--input", required=True, type=str, help="Input WebSite dir.")
@click.option("-i", "--inputs", required=True, type=str, help="Input WebSite dir.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Add regular search rule.")
@click.option("-a", '--all',is_flag=True, default=False,help="Output all strings.")
@click.option("-a", '--all-str',is_flag=True, default=False,help="Output all strings.")
@click.option("-t", '--threads',required=False, type=int,default=10,help="Set the number of threads to 10 by default")
def web(input: str, rules: str, all:bool,threads:int) -> None:
def web(inputs: str, rules: str, all_str:bool,threads:int) -> None:
try:
# 初始化全局对象
bootstrapper = Bootstrapper(__file__)
bootstrapper.init()
# BaseTask("Web", inputs, rules,all_str, threads).start()
task = WebTask(input, rules,all,threads)
# 让内层代码直接抛出异常,不做rollback。
task.start()
except Exception as e:
raise e

@ -28,6 +28,8 @@ filter_components = [
# 2. IPv4的ip地址
# 3. URI地址
filter_strs =[
r'^http://.*',
r'^https://.*',
r'.*(http://.*)',
r'.*(https://.*)',
r'.*((?:[0-9]{1,3}\.){3}[0-9]{1,3}).*',

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import platform
import os
import time
import shutil
import platform
# smali 所在路径
smali_path = ""
@ -30,49 +31,42 @@ class Bootstrapper(object):
global os_type
global output_path
global script_root_dir
global result_path
global txt_result_path
global xls_result_path
global strings_path
create_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
script_root_dir = os.path.dirname(os.path.abspath(path))
tools_dir = os.path.join(script_root_dir,"tools")
if platform.system() == "Windows":
machine2bits = {'AMD64':64, 'x86_64': 64, 'i386': 32, 'x86': 32}
machine2bits.get(platform.machine())
if platform.machine() == 'i386' or platform.machine() == 'x86':
strings_path = os.path.join(script_root_dir,"strings.exe")
strings_path = os.path.join(tools_dir,"strings.exe")
else:
strings_path = os.path.join(script_root_dir,"strings64.exe")
strings_path = os.path.join(tools_dir,"strings64.exe")
else:
strings_path ="strings"
# os_type = "win"
# smali_str = "smali.bat"
# back_smali_str = "backsmali.bat"
# apktool_path_str = "apktool.bat"
# elif platform.system() == "Linux":
# os_type = "lin"
# smali_str = "smali"
# back_smali_str = "backsmali"
# apktool_path_str = "apktool"
# else:
# os_type = "mac"
# smali_str = "smali"
# smali_path = os.path.join(tools_dir,str(os_type) + os.sep + smali_str)
backsmali_path = os.path.join(tools_dir,"baksmali.jar")
apktool_path = os.path.join(tools_dir, "apktool.jar")
output_path = os.path.join(script_root_dir,"out")
result_path = os.path.join(script_root_dir,"result.txt")
txt_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".txt")
xls_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".xls")
def init(self):
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path)
if os.path.exists(result_path):
os.remove(result_path)
if os.path.exists(txt_result_path):
os.remove(txt_result_path)
if os.path.exists(xls_result_path):
os.remove(xls_result_path)

@ -2,16 +2,15 @@
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import threading
import config
import re
import os
import config
import threading
import libs.core as cores
class ParsesThreads(threading.Thread):
def __init__(self,threadID,name,file_queue,all,result_dict):
def __init__(self,threadID,name,file_queue,all,result_dict,types):
threading.Thread.__init__(self)
self.file_queue = file_queue
self.name = name
@ -19,55 +18,54 @@ class ParsesThreads(threading.Thread):
self.result_list = []
self.all = all
self.result_dict=result_dict
self.types = types
def __regular_parse__(self,threadLock):
def __regular_parse__(self):
while True:
try:
file_path = self.file_queue.get(timeout = 5)
scan_str = ("Scan file : %s" % file_path)
print(scan_str)
try:
os.path.basename(file_path).split(".")[1]
except Exception as e:
self.__get_string__(file_path,threadLock)
continue
self.__file_parse__(file_path,threadLock)
result_set = set(self.result_list)
if len(result_set) !=0:
self.result_dict[file_path] = result_set
if self.file_queue.empty():
break
except Exception as e:
if self.file_queue.empty():
break
file_path = self.file_queue.get(timeout = 5)
scan_str = ("Scan file : %s" % file_path)
print(scan_str)
if self.types == "iOS":
self.__get_string_by_iOS__(file_path)
else:
self.__get_string_by_file__(file_path)
result_set = set(self.result_list)
if len(result_set) !=0:
self.result_dict[file_path] = result_set
def __get_string_by_iOS__(self,file_path):
output_path = cores.output_path
strings_path = cores.strings_path
temp = os.path.join(output_path,"temp.txt")
cmd_str = ("%s %s > %s") % (strings_path,file_path,temp)
if os.system(cmd_str) == 0:
with open(temp,"r",encoding='utf-8',errors='ignore') as f:
lines = f.readlines()
for line in lines:
self.__parse_string__(line)
def __file_parse__(self,file_path,threadLock):
with open(file_path,"r",encoding="utf8") as file :
file_content = file.read()
def __get_string_by_file__(self,file_path):
with open(file_path,"r",encoding="utf8",errors='ignore') as f :
file_content = f.read()
# 获取到所有的字符串
pattern = re.compile(r'\"(.*?)\"')
results = pattern.findall(file_content)
# 遍历所有的字符串
for result in set(results):
self.__parse_string__(result,threadLock)
def __get_string__(self,dir_file_path,threadLock):
temp = os.path.join(cores.output_path,"temp.txt")
cmd_str = ("%s %s > %s") % (cores.strings_path,dir_file_path,temp)
if os.system(cmd_str) == 0:
with open(temp,"r") as f:
lines = f.readlines()
for line in lines:
self.__parse_string__(line,threadLock)
def __parse_string__(self,result,threadLock):
self.__parse_string__(result)
def __parse_string__(self,result):
# 通过正则筛选需要过滤的字符串
for filter_str in config.filter_strs:
filter_str_pat = re.compile(filter_str)
filter_resl = filter_str_pat.findall(result)
# print(result,filter_resl)
# 过滤掉未搜索到的内容
if len(filter_resl)!=0:
# 提取第一个结果
@ -76,9 +74,9 @@ class ParsesThreads(threading.Thread):
if self.__filter__(resl_str) == 0:
continue
threadLock.acquire()
self.result_list.append(filter_resl[0])
threadLock.release()
self.threadLock.acquire()
self.result_list.append(resl_str)
self.threadLock.release()
continue
def __filter__(self,resl_str):
@ -105,5 +103,5 @@ class ParsesThreads(threading.Thread):
return return_flag
def run(self):
threadLock = threading.Lock()
self.__regular_parse__(threadLock)
self.threadLock = threading.Lock()
self.__regular_parse__()

@ -12,155 +12,105 @@ import libs.core as cores
from libs.core.parses import ParsesThreads
class AndroidTask(object):
comp_list =[]
thread_list =[]
result_dict = {}
value_list = []
def __init__(self, input, rules, net_sniffer,no_resource,package,all,threads):
self.net_sniffer = net_sniffer
self.path = input
if rules:
config.filter_strs.append(r'.*'+str(rules)+'.*')
def __init__(self,path,no_resource,package):
self.path = path
self.no_resource = no_resource
self.package = package
self.all = all
self.threads = threads
self.file_queue = Queue()
self.shell_falg=False
self.shell_flag=False
self.packagename=""
self.comp_list=[]
def start(self):
# 检查java环境是否存在
if os.system("java -version") !=0 :
raise Exception("Please install the Java environment!")
# 根据不同的文件后缀进行文件解析
if os.path.isfile(self.path):
suffix_name = self.path.split(".")[-1]
if suffix_name == "apk":
self.__decode_apk__(self.path)
elif suffix_name == "dex":
self.__decode_dex__(self.path)
else:
# 抛出异常
raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.")
else:
self.__get_file_type__(self.path)
self.__start_threads()
input_file_path = self.path
for thread in self.thread_list:
thread.join()
self.__print__()
if os.path.isdir(input_file_path):
self.__decode_dir__(input_file_path)
else:
if self.__decode_file__(input_file_path) == "error":
raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.")
# if self.net_sniffer:
# self.__start_net__()
return {"comp_list":self.comp_list,"shell_flag":self.shell_flag,"file_queue":self.file_queue,"packagename":self.packagename}
def __decode_file__(self,file_path):
apktool_path = str(cores.apktool_path)
output_path = str(cores.output_path)
backsmali_path = str(cores.backsmali_path)
suffix_name = file_path.split(".")[-1]
if suffix_name == "apk":
self.__decode_apk__(file_path,apktool_path,output_path)
elif suffix_name == "dex":
self.__decode_dex__(file_path,backsmali_path,output_path)
else:
return "error"
def __decode_dir__(self,root_dir):
dir_or_files = os.listdir(root_dir)
for dir_or_file in dir_or_files:
dir_or_file_path = os.path.join(root_dir,dir_or_file)
if os.path.isdir(dir_or_file_path):
self.__decode_dir__(dir_or_file_path)
else:
if self.__decode_file__(dir_or_file_path) == "error":
continue
# 分解apk
def __decode_apk__(self,path):
if self.no_resource:
self.__decode_dex__(path)
def __decode_apk__(self,file_path,apktool_path,output_path):
cmd_str = ("java -jar %s d -f %s -o %s --only-main-classe") % (apktool_path,str(file_path),output_path)
if os.system(cmd_str) == 0:
self.__shell_test__(output_path)
self.__scanner_file_by_apktool__(output_path)
else:
cmd_str = ("java -jar %s d -f %s -o %s --only-main-classe") % (cores.apktool_path,path,cores.output_path)
if os.system(cmd_str) == 0:
self.__scanner_file_by_apktool__(cores.output_path)
else:
print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception("Decompilation failed.")
print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception(file_path + ", Decompilation failed.")
# 分解dex
def __decode_dex__(self,path):
cmd_str = ("java -jar %s d %s") % (cores.backsmali_path,path)
def __decode_dex__(self,file_path,backsmali_path,output_path):
cmd_str = ("java -jar %s d %s") % (backsmali_path,str(file_path))
if os.system(cmd_str) == 0:
self.__get_scanner_file__(cores.output_path,"smali")
self.__get_scanner_file__(output_path)
else:
print("Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception("Decompilation failed.")
raise Exception(file_path + ", Decompilation failed.")
# 初始化检测文件信息
def __scanner_file_by_apktool__(self,output):
# shell检测
self.__shell_test__(output)
scanner_dir_list = ["smali","assets"]
scanner_file_suffix = ["smali","js","xml"]
def __scanner_file_by_apktool__(self,output_path):
if self.no_resource:
scanner_dir_lists = ["smali"]
scanner_file_suffixs = ["smali"]
else:
scanner_dir_lists = ["smali","assets"]
scanner_file_suffixs = ["smali","js","xml"]
for dir in scanner_dir_list:
scanner_dir = os.path.join(output,dir)
for scanner_dir_list in scanner_dir_lists:
scanner_dir = os.path.join(output_path,scanner_dir_list)
if os.path.exists(scanner_dir):
self.__get_scanner_file__(scanner_dir,scanner_file_suffix)
self.__get_scanner_file__(scanner_dir,scanner_file_suffixs)
def __get_scanner_file__(self,scanner_dir,file_suffix):
def __get_scanner_file__(self,scanner_dir,scanner_file_suffixs=["smali"]):
dir_or_files = os.listdir(scanner_dir)
for dir_file in dir_or_files:
dir_file_path = os.path.join(scanner_dir,dir_file)
for dir_or_file in dir_or_files:
dir_file_path = os.path.join(scanner_dir,dir_or_file)
if os.path.isdir(dir_file_path):
self.__get_scanner_file__(dir_file_path,file_suffix)
self.__get_scanner_file__(dir_file_path,scanner_file_suffixs)
else:
if "." not in dir_file:
continue
if len(dir_file.split("."))>1:
if dir_file.split(".")[-1] in file_suffix:
self.file_queue.put(dir_file_path)
for component in config.filter_components:
comp = component.replace(".","/")
if( comp in dir_file_path):
if(component not in self.comp_list):
self.comp_list.append(component)
def __get_file_type__(self,root_path):
dir_or_files = os.listdir(root_path)
for dir_file in dir_or_files:
dir_file_path = os.path.join(root_path,dir_file)
if os.path.isdir(dir_file_path):
self.__get_file_type__(dir_file_path)
else:
suffix_name = dir_file.split(".")[-1]
if suffix_name == "apk":
self.__decode_apk__(dir_file)
elif suffix_name == "dex":
self.__decode_dex__(dir_file)
else:
if ("." not in dir_or_file) or (len(dir_or_file.split(".")) < 1) or (dir_or_file.split(".")[-1] not in scanner_file_suffixs):
continue
def __print__(self):
if self.packagename:
print("========= The package name of this APP is: ===============")
print(self.packagename)
if len(self.comp_list) != 0:
print("========= Component information is as follows :===============")
for json in self.comp_list:
print(json)
print("=========The result set for the static scan is shown below:===============")
with open(cores.result_path,"a+") as f:
for key,value in self.result_dict.items():
f.write(key+"\r")
for result in value:
if result in self.value_list:
continue
self.value_list.append(result)
print(result)
f.write("\t"+result+"\r")
print("For more information about the search, see: %s" %(cores.result_path))
if self.shell_falg:
print('\033[1;33mWarning: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!')
def __start_threads(self):
for threadID in range(1,self.threads) :
name = "Thread - " + str(threadID)
thread = ParsesThreads(threadID,name,self.file_queue,self.all,self.result_dict)
thread.start()
self.thread_list.append(thread)
self.file_queue.put(dir_file_path)
for component in config.filter_components:
comp = component.replace(".","/")
if(comp in dir_file_path):
if(component not in self.comp_list):
self.comp_list.append(component)
def __shell_test__(self,output):
am_path = os.path.join(output,"AndroidManifest.xml")
@ -171,10 +121,10 @@ class AndroidTask(object):
am_package= re.compile(r'<manifest.*package=\"(.*?)\".*')
apackage = am_package.findall(am_str)
if len(apackage) >=1:
self.packagename = apackage
self.packagename = apackage[0]
am_name = re.compile(r'<application.*android:name=\"(.*?)\".*>')
aname = am_name.findall(am_str)
if aname and len(aname)>=1:
if aname[0] in config.shell_list:
self.shell_falg = True
self.shell_flag = True

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
# 接收传入的参数信息,根据参数进行平台分发
import os
import re
import xlwt
import config
import threading
from queue import Queue
import libs.core as cores
from libs.core.parses import ParsesThreads
from libs.task.android_task import AndroidTask
from libs.task.ios_task import iOSTask
from libs.task.web_task import WebTask
class BaseTask(object):
thread_list =[]
result_dict = {}
value_list = []
# 统一初始化入口
def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10):
self.types = types
self.net_sniffer = net_sniffer
self.path = inputs
if rules:
config.filter_strs.append(r'.*'+str(rules)+'.*')
self.no_resource = no_resource
self.package = package
self.all = all_str
self.threads = threads
self.file_queue = Queue()
# 统一调度平台
def start(self):
workbook = xlwt.Workbook(encoding = 'utf-8')
# 创建excel头
worksheet = self.__creating_excel_header__(workbook)
# 任务控制中心
task_info = self.__tast_control__()
file_queue = task_info["file_queue"]
shell_flag = task_info["shell_flag"]
comp_list = task_info["comp_list"]
packagename = task_info["packagename"]
if shell_flag:
print('\033[3;31m Error: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!')
return
# 线程控制中心
self.__threads_control__(file_queue)
# 等待线程结束
for thread in self.thread_list:
thread.join()
# 结果输出中心
self.__print_control__(packagename,comp_list,workbook,worksheet)
def __creating_excel_header__(self,workbook):
worksheet = workbook.add_sheet("扫描信息",cell_overwrite_ok=True)
worksheet.write(0,0, label = "扫描结果")
return worksheet
def __tast_control__(self):
task_info = {}
# 调用Android 相关处理逻辑
if self.types == "Android":
task_info = AndroidTask(self.path,self.no_resource,self.package).start()
# 调用iOS 相关处理逻辑
elif self.types == "iOS":
task_info = iOSTask(self.path,self.no_resource).start()
# 调用Web 相关处理逻辑
else:
task_info = WebTask.start()
return task_info
def __threads_control__(self,file_queue):
for threadID in range(1,self.threads):
name = "Thread - " + str(threadID)
thread = ParsesThreads(threadID,name,file_queue,self.all,self.result_dict,self.types)
thread.start()
self.thread_list.append(thread)
def __print_control__(self,packagename,comp_list,workbook,worksheet):
txt_result_path = cores.txt_result_path
xls_result_path = cores.xls_result_path
if packagename:
print("========= The package name of this APP is: ===============")
print(packagename)
if len(comp_list) != 0:
print("========= Component information is as follows :===============")
for json in comp_list:
print(json)
print("=========The result set for the static scan is shown below:===============")
with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f:
row = 1
for key,value in self.result_dict.items():
f.write(key+"\r")
for result in value:
if result in self.value_list:
continue
self.value_list.append(result)
print(result)
worksheet.write(row,0, label = result)
row = row + 1
f.write("\t"+result+"\r")
print("For more information about the search, see TXT file result: %s" %(txt_result_path))
print("For more information about the search, see XLS file result: %s" %(xls_result_path))
workbook.save(xls_result_path)

@ -5,45 +5,63 @@
import os
import re
import shutil
import zipfile
import binascii
import platform
from pathlib import Path
import libs.core as cores
from queue import Queue
from libs.core.parses import ParsesThreads
class iOSTask(object):
thread_list =[]
value_list = []
result_dict = {}
def __init__(self,input, rules, net_sniffer,no_resource,all,threads):
self.path = input
self.rules = rules
self.net_sniffer = net_sniffer
elf_file_name = ""
def __init__(self,path,no_resource):
self.path = path
self.no_resource = no_resource
self.all = all
self.threads = threads
if rules:
config.filter_strs.append(r'.*'+str(rules)+'.*')
self.file_queue = Queue()
self.shell_falg=False
self.shell_flag=False
def start(self):
# ipa 文件
if self.path.split(".")[-1] == 'ipa':
# 对ipa进行解包
file_path = self.path
if file_path.split(".")[-1] == 'ipa':
self.__decode_ipa__(cores.output_path)
#文件提取
self.__scanner_file_by_ipa__(cores.output_path)
self.__start_threads()
for thread in self.thread_list:
thread.join()
elif self.__get_file_header__(file_path):
self.file_queue.put(file_path)
else:
raise Exception("Retrieval of this file type is not supported. Select IPA file or Mach-o file.")
return {"shell_flag":self.shell_flag,"file_queue":self.file_queue,"comp_list":[],"packagename":None}
self.__print__()
def __get_file_header__(self,file_path):
print("====================")
hex_hand = 0x0
with open(file_path,"rb") as macho_file:
macho_file.seek(hex_hand,0)
magic = binascii.hexlify(macho_file.read(4)).decode().upper()
macho_magics = ["CFFAEDFE","CEFAEDFE","BEBAFECA","CAFEBABE"]
if magic in macho_magics:
self.__shell_test__(macho_file,hex_hand)
macho_file.close()
return True
macho_file.close()
return False
def __shell_test__(self,macho_file,hex_hand):
while True:
magic = binascii.hexlify(macho_file.read(4)).decode().upper()
if magic == "2C000000":
macho_file.seek(hex_hand,0)
encryption_info_command = binascii.hexlify(macho_file.read(24)).decode()
cryptid = encryption_info_command[-8:len(encryption_info_command)]
print(cryptid)
if cryptid == "01000000":
self.shell_flag = True
break
hex_hand = hex_hand + 4
def __scanner_file_by_ipa__(self,output):
scanner_file_suffix = ["plist","js","xml","html"]
@ -60,39 +78,58 @@ class iOSTask(object):
self.__get_scanner_file__(dir_file_path,file_suffix)
else:
if self.elf_file_name == dir_file:
self.__get_file_header__(dir_file_path)
print(self.shell_flag)
self.file_queue.put(dir_file_path)
continue
if self.no_resource:
dir_file_suffix = dir_file.split(".")
if len(dir_file_suffix) > 1:
if dir_file_suffix[-1] in file_suffix:
self.__get_file_header__(dir_file_path)
self.file_queue.put(dir_file_path)
def __decode_ipa__(self,output_path):
zip_files = zipfile.ZipFile(self.path)
for zip_file in zip_files.filelist:
zip_files.extract(zip_file.filename,output_path)
with zipfile.ZipFile(self.path,"r") as zip_files:
zip_file_names = zip_files.namelist()
zip_files.extract(zip_file_names[0],output_path)
try:
new_zip_file = zip_file_names[0].encode('cp437').decode('utf-8')
except UnicodeEncodeError:
new_zip_file = zip_file_names[0].encode('utf-8').decode('utf-8')
old_zip_dir = self.__get_parse_dir__(output_path,zip_file_names[0])
new_zip_dir = self.__get_parse_dir__(output_path,new_zip_file)
os.rename(old_zip_dir,new_zip_dir)
for zip_file in zip_file_names:
old_ext_path = zip_files.extract(zip_file,output_path)
start = str(old_ext_path).index("Payload")
dir_path = old_ext_path[start:len(old_ext_path)]
old_ext_path = os.path.join(output_path,dir_path)
try:
new_zip_file = zip_file.encode('cp437').decode('utf-8')
except UnicodeEncodeError:
new_zip_file = zip_file.encode('utf-8').decode('utf-8')
new_ext_path = os.path.join(output_path,new_zip_file)
def __start_threads(self):
for threadID in range(1,self.threads) :
name = "Thread - " + str(threadID)
thread = ParsesThreads(threadID,name,self.file_queue,self.all,self.result_dict)
thread.start()
self.thread_list.append(thread)
if platform.system() == "Windows":
new_ext_path = new_ext_path.replace("/","\\")
if not os.path.exists(new_ext_path):
dir_path = os.path.dirname(new_ext_path)
if not os.path.exists(dir_path):
os.makedirs(dir_path)
shutil.move(old_ext_path, new_ext_path)
def __print__(self):
print("=========The result set for the static scan is shown below:===============")
with open(cores.result_path,"a+") as f:
for key,value in self.result_dict.items():
f.write(key+"\r")
for result in value:
if result in self.value_list:
continue
self.value_list.append(result)
print(result)
f.write("\t"+result+"\r")
print("For more information about the search, see: %s" %(cores.result_path))
if self.shell_falg:
print('\033[1;33mWarning: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!')
def __get_parse_dir__(self,output_path,file_path):
start = file_path.index("Payload/")
end = file_path.index(".app")
root_dir = file_path[start:end]
if platform.system() == "Windows":
root_dir = root_dir.replace("/","\\")
old_root_dir = os.path.join(output_path,root_dir+".app")
return old_root_dir

@ -1 +1,2 @@
click
click
xlwt

@ -1,9 +1,26 @@
### V1.0.5
- 新增对DOM、SAX、DOM4J、JDOM等XML解析组件的识别
- 新增反编译失败后提交issues入口
- 新增ipa壳识别功能,将能够更好的对壳进行识别
- 新增macho文件扫描功能
- 新增结果url和ip地址单独输出到excel
- 优化后缀名获取方式
- 优化任务控制中心,将分散的入口整合为一个(web的除外)
- 化任务处理逻辑,识别到有壳后,停止执行后继逻辑
- 修复部分DEX格式原因导致无法进行反编译问题
- 修复ipa包中存在中文路径不能识别macho问题
### V1.0.4
- 新增对Flutter框架检测支持

Loading…
Cancel
Save