- 更新apktool为最新版本

- 优化部分环节流程
- 修复excle文件导出时超时行数限制
- 修复脚本执行时卡顿的问题
- 修复Mac下Playload文件权限不足的问题
v1.0.09 V1.0.9_Releases
kelvinBen 2 years ago
parent 51ca5a0c5b
commit 827ef29e65
  1. 8
      app.py
  2. 8
      config.py
  3. 5
      libs/core/__init__.py
  4. 15
      libs/core/download.py
  5. 30
      libs/core/net.py
  6. 14
      libs/core/parses.py
  7. 13
      libs/task/android_task.py
  8. 42
      libs/task/base_task.py
  9. 4
      libs/task/download_task.py
  10. 26
      libs/task/ios_task.py
  11. 46
      libs/task/net_task.py
  12. 1
      libs/task/web_task.py
  13. 2
      requirements.txt
  14. BIN
      tools/apktool.jar
  15. 7
      update.md

@ -8,11 +8,14 @@ import click
from libs.core import Bootstrapper
from libs.task.base_task import BaseTask
@click.group(help="Python script for automatically retrieving key information in app.")
def cli():
pass
# 创建Android任务
@cli.command(help="Get the key information of Android system.")
@click.option("-i", "--inputs", required=True, type=str, help="Please enter the APK file or DEX file to be scanned or the corresponding APK download address.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Please enter a rule for temporary scanning of file contents.")
@ -31,6 +34,7 @@ def android(inputs: str, rules: str, sniffer: bool, no_resource:bool, all:bool,
except Exception as e:
raise e
@cli.command(help="Get the key information of iOS system.")
@click.option("-i", "--inputs", required=True, type=str, help="Please enter IPA file or ELF file to scan or corresponding IPA download address. App store is not supported at present.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Please enter a rule for temporary scanning of file contents.")
@ -48,6 +52,7 @@ def ios(inputs: str, rules: str, sniffer: bool, no_resource:bool, all:bool, thre
except Exception as e:
raise e
@cli.command(help="Get the key information of Web system.")
@click.option("-i", "--inputs", required=True, type=str, help="Please enter the site directory or site file to scan or the corresponding site download address.")
@click.option("-r", "--rules", required=False, type=str, default="", help="Please enter a rule for temporary scanning of file contents.")
@ -65,9 +70,10 @@ def web(inputs: str, rules: str, sniffer: bool, no_resource:bool, all:bool, thre
except Exception as e:
raise e
def main():
cli()
if __name__ == "__main__":
main()

@ -50,14 +50,15 @@ filter_no = [
r'.*w3school.com.cn',
r'.*apple.com',
r'.*.amap.com',
r'.*slf4j.org',
]
# AK集合
filter_ak_map = {
"Aliyun_OSS": [
r'.*accessKeyId.*".*"',
r'.*accessKeySecret.*".*"',
r'.*secret.*".*"'
r'.*accessKeyId.*".*?"',
r'.*accessKeySecret.*".*?"',
r'.*secret.*".*?"'
],
# "Amazon_AWS_Access_Key_ID": r"([^A-Z0-9]|^)(AKIA|A3T|AGPA|AIDA|AROA|AIPA|ANPA|ANVA|ASIA)[A-Z0-9]{12,}",
# "Amazon_AWS_S3_Bucket": [
@ -183,4 +184,3 @@ data = {
# 配置自动下载Apk文件或者缓存HTML的请求方法信息,目前仅支持GET和POST
method = "GET"

@ -26,7 +26,7 @@ output_path = ""
download_flag = False
# excel 起始行号
excel_row = 0
excel_row = 1
class Bootstrapper(object):
@ -78,7 +78,7 @@ class Bootstrapper(object):
apktool_path = os.path.join(tools_dir, "apktool.jar")
download_path = os.path.join(out_dir,"download")
txt_result_path = os.path.join(out_dir,"result_"+str(create_time)+".txt")
xls_result_path = os.path.join(out_dir,"result_"+str(create_time)+".xls")
xls_result_path = os.path.join(out_dir,"result_"+str(create_time)+".xlsx")
app_history_path = os.path.join(history_path,"app_history.txt")
domain_history_path = os.path.join(history_path,"domain_history.txt")
@ -100,7 +100,6 @@ class Bootstrapper(object):
print("[*] Create directory {}".format(output_path))
if not os.path.exists(download_path):
# shutil.rmtree(download_path)
os.makedirs(download_path)
print("[*] Create directory {}".format(download_path))

@ -2,10 +2,7 @@
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import re
import os
import sys
import time
import config
import requests
import threading
@ -13,6 +10,7 @@ import libs.core as cores
from requests.packages import urllib3
from requests.adapters import HTTPAdapter
class DownloadThreads(threading.Thread):
def __init__(self, input_path, file_name, cache_path, types):
@ -32,15 +30,16 @@ class DownloadThreads(threading.Thread):
urllib3.disable_warnings()
if config.method.upper() == "POST":
resp = session.post(url=self.url,params=config.data ,headers=config.headers,timeout=30)
resp = session.post(
url=self.url, params=config.data, headers=config.headers, timeout=30)
else:
resp = session.get(url=self.url,data=config.data ,headers=config.headers,timeout=30)
resp = session.get(url=self.url, data=config.data,
headers=config.headers, timeout=30)
if resp.status_code == requests.codes.ok:
if self.types == "Android" or self.types == "iOS":
count = 0
progress_tmp = 0
time1 = time.time()
length = float(resp.headers['content-length'])
with open(self.cache_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=512):
@ -51,7 +50,8 @@ class DownloadThreads(threading.Thread):
if progress != progress_tmp:
progress_tmp = progress
print("\r", end="")
print("[*] Download progress: {}%: ".format(progress), "" * (progress // 2), end="")
print(
"[*] Download progress: {}%: ".format(progress), "" * (progress // 2), end="")
sys.stdout.flush()
f.close()
else:
@ -62,7 +62,6 @@ class DownloadThreads(threading.Thread):
cores.download_flag = True
except Exception as e:
raise Exception(e)
return
def run(self):
threadLock = threading.Lock()

@ -31,20 +31,30 @@ class NetThreads(threading.Thread):
if result != "error":
if self.lock.acquire(True):
cores.excel_row = cores.excel_row + 1
self.worksheet.write(cores.excel_row, 0, label = cores.excel_row)
self.worksheet.write(cores.excel_row, 1, label = url_ip)
self.worksheet.write(cores.excel_row, 2, label = domain)
self.worksheet.cell(row=cores.excel_row,
column=1, value=cores.excel_row-1)
self.worksheet.cell(row=cores.excel_row,
column=2, value=url_ip)
self.worksheet.cell(row=cores.excel_row,
column=3, value=domain)
if result != "timeout":
self.worksheet.write(cores.excel_row, 3, label = result["status"])
self.worksheet.write(cores.excel_row, 4, label = result["des_ip"])
self.worksheet.write(cores.excel_row, 5, label = result["server"])
self.worksheet.write(cores.excel_row, 6, label = result["title"])
self.worksheet.write(cores.excel_row, 7, label = result["cdn"])
# self.worksheet.write(cores.excel_row, 8, label = "")
self.worksheet.cell(
row=cores.excel_row, column=4, value=result["status"])
self.worksheet.cell(
row=cores.excel_row, column=5, value=result["des_ip"])
self.worksheet.cell(
row=cores.excel_row, column=6, value=result["server"])
self.worksheet.cell(
row=cores.excel_row, column=7, value=result["title"])
self.worksheet.cell(
row=cores.excel_row, column=8, value=result["cdn"])
self.lock.release()
def __get_request_result__(self, url):
result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""}
result = {"status": "", "server": "", "cookie": "",
"cdn": "", "des_ip": "", "sou_ip": "", "title": ""}
cdn = ""
try:
with requests.get(url, timeout=5, stream=True) as rsp:

@ -40,7 +40,8 @@ class ParsesThreads(threading.Thread):
output_path = cores.output_path
strings_path = cores.strings_path
temp = os.path.join(output_path, "temp.txt")
cmd_str = ('"%s" "%s" > "%s"') % (str(strings_path),str(file_path),str(temp))
cmd_str = ('"%s" "%s" > "%s"') % (
str(strings_path), str(file_path), str(temp))
if os.system(cmd_str) == 0:
with open(temp, "r", encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
@ -56,6 +57,8 @@ class ParsesThreads(threading.Thread):
# 搜素AK和SK信息,由于iOS的逻辑处理效率过慢暂时忽略对iOS的AK检测
if not (".js" == file_path[-3:] and self.types == "iOS"):
# 未包含相关字段不进行ak或者sk信息采集
if "access" in file_content or "secret" in file_content:
for key, values in config.filter_ak_map.items():
if isinstance(values, list):
for value in values:
@ -65,6 +68,8 @@ class ParsesThreads(threading.Thread):
# 遍历所有的字符串
for result in set(results):
if ("http://" == result) or ("https://" == result) or result.startswith("https://.") or result.startswith("http://.") :
continue
self.__parse_string__(result)
def __ak_and_sk__(self, name, ak_rule, content):
@ -89,14 +94,16 @@ class ParsesThreads(threading.Thread):
self.threadLock.acquire()
if cores.all_flag:
print(("[+] The string searched for matching rule is: %s") % (resl_str))
print(
("[+] The string searched for matching rule is: %s") % (resl_str))
self.result_list.append(resl_str)
self.threadLock.release()
continue
def __filter__(self, resl_str):
return_flag = 1
resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","")
resl_str = resl_str.replace("\r", "").replace(
"\n", "").replace(" ", "")
if len(resl_str) == 0:
return 0
@ -110,6 +117,7 @@ class ParsesThreads(threading.Thread):
if re.match(filte, resl_str):
return_flag = 0
continue
return return_flag
def run(self):

@ -33,7 +33,8 @@ class AndroidTask(object):
self.__decode_dir__(input_file_path)
else:
if self.__decode_file__(input_file_path) == "error":
raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.")
raise Exception(
"Retrieval of this file type is not supported. Select APK file or DEX file.")
return {"comp_list": self.comp_list, "shell_flag": self.shell_flag, "file_queue": self.file_queue, "packagename": self.packagename, "file_identifier": self.file_identifier, "permissions": self.permissions}
@ -44,7 +45,7 @@ class AndroidTask(object):
filename = os.path.basename(file_path)
suffix_name = filename.split(".")[-1]
if suffix_name == "apk":
if suffix_name == "apk" or suffix_name == "hpk":
name = filename.split(".")[0]
output_path = os.path.join(base_out_path, name)
self.__decode_apk__(file_path, apktool_path, output_path)
@ -77,7 +78,8 @@ class AndroidTask(object):
# 分解apk
def __decode_apk__(self, file_path, apktool_path, output_path):
cmd_str = ('java -jar "%s" d -f "%s" -o "%s" --only-main-classe') % (str(apktool_path),str(file_path),str(output_path))
cmd_str = ('java -jar "%s" d -f "%s" -o "%s" --only-main-classe') % (
str(apktool_path), str(file_path), str(output_path))
if os.system(cmd_str) == 0:
self.__shell_test__(output_path)
self.__scanner_file_by_apktool__(output_path)
@ -85,17 +87,16 @@ class AndroidTask(object):
print("[-] Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception(file_path + ", Decompilation failed.")
# 分解dex
def __decode_dex__(self, file_path, backsmali_path, output_path):
cmd_str = ('java -jar "%s" d "%s"') % (str(backsmali_path),str(file_path))
cmd_str = ('java -jar "%s" d "%s"') % (str(backsmali_path),
str(file_path))
if os.system(cmd_str) == 0:
self.__get_scanner_file__(output_path)
else:
print("[-] Decompilation failed, please submit error information at https://github.com/kelvinBen/AppInfoScanner/issues")
raise Exception(file_path + ", Decompilation failed.")
# 初始化检测文件信息
def __scanner_file_by_apktool__(self, output_path):
file_names = os.listdir(output_path)

@ -3,9 +3,7 @@
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import os
import re
import config
import threading
from queue import Queue
import libs.core as cores
from libs.task.ios_task import iOSTask
@ -15,12 +13,14 @@ from libs.core.parses import ParsesThreads
from libs.task.android_task import AndroidTask
from libs.task.download_task import DownloadTask
class BaseTask(object):
thread_list = []
result_dict = {}
app_history_list = []
domain_history_list = []
# 统一初始化入口
def __init__(self, types="Android", inputs="", rules="", sniffer=True, threads=10, package=""):
self.types = types
self.path = inputs
@ -31,8 +31,8 @@ class BaseTask(object):
self.package = package
self.file_queue = Queue()
# 统一调度平台
def start(self):
print("[*] AI is analyzing filtering rules......")
@ -40,7 +40,8 @@ class BaseTask(object):
# 获取历史记录
self.__history_handle__()
print("[*] The filtering rules obtained by AI are as follows: %s" % (set(config.filter_no)) )
print("[*] The filtering rules obtained by AI are as follows: %s" %
(set(config.filter_no)))
# 任务控制中心
task_info = self.__tast_control__()
@ -59,7 +60,8 @@ class BaseTask(object):
return
# 线程控制中心
print("[*] ========= Searching for strings that match the rules ===============")
print(
"[*] ========= Searching for strings that match the rules ===============")
self.__threads_control__(file_queue)
# 等待线程结束
@ -67,8 +69,8 @@ class BaseTask(object):
thread.join()
# 结果输出中心
self.__print_control__(packagename,comp_list,file_identifier,permissions)
self.__print_control__(packagename, comp_list,
file_identifier, permissions)
def __tast_control__(self):
task_info = {}
@ -78,7 +80,8 @@ class BaseTask(object):
types = cache_info["type"]
if (not os.path.exists(cacar_path) and cores.download_flag):
print("[-] File download failed! Please download the file manually and try again.")
print(
"[-] File download failed! Please download the file manually and try again.")
return task_info
# 调用Android 相关处理逻辑
@ -95,7 +98,8 @@ class BaseTask(object):
def __threads_control__(self, file_queue):
for threadID in range(1, self.threads):
name = "Thread - " + str(int(threadID))
thread = ParsesThreads(threadID,name,file_queue,self.result_dict,self.types)
thread = ParsesThreads(
threadID, name, file_queue, self.result_dict, self.types)
thread.start()
self.thread_list.append(thread)
@ -105,8 +109,10 @@ class BaseTask(object):
all_flag = cores.all_flag
if self.sniffer:
print("[*] ========= Sniffing the URL address of the search ===============")
NetTask(self.result_dict,self.app_history_list,self.domain_history_list,file_identifier,self.threads).start()
print(
"[*] ========= Sniffing the URL address of the search ===============")
NetTask(self.result_dict, self.app_history_list,
self.domain_history_list, file_identifier, self.threads).start()
if packagename:
print("[*] ========= The package name of this APP is: ===============")
@ -118,7 +124,8 @@ class BaseTask(object):
print(json)
if len(permissions) != 0:
print("[*] ========= Sensitive permission information is as follows: ===============")
print(
"[*] ========= Sensitive permission information is as follows: ===============")
for permission in permissions:
print(permission)
@ -133,10 +140,12 @@ class BaseTask(object):
value_list.append(result)
f.write("\t"+result+"\r")
f.close()
print("[*] For more information about the search, see TXT file result: %s" %(txt_result_path))
print("[*] For more information about the search, see TXT file result: %s" %
(txt_result_path))
if self.sniffer:
print("[*] For more information about the search, see XLS file result: %s" %(xls_result_path))
print("[*] For more information about the search, see XLSX file result: %s" %
(xls_result_path))
def __history_handle__(self):
domain_history_path = cores.domain_history_path
@ -148,7 +157,8 @@ class BaseTask(object):
lines = f.readlines()
app_size = len(lines)
for line in lines:
self.app_history_list.append(line.replace("\r","").replace("\n",""))
self.app_history_list.append(
line.replace("\r", "").replace("\n", ""))
f.close()
@ -164,5 +174,3 @@ class BaseTask(object):
if domain_count >= cout:
config.filter_no.append(".*" + domain)
f.close()

@ -11,6 +11,7 @@ from queue import Queue
import libs.core as cores
from libs.core.download import DownloadThreads
class DownloadTask(object):
def start(self, path, types):
@ -37,7 +38,8 @@ class DownloadTask(object):
else: # 目录处理
return {"path": path, "type": types}
else:
print("[*] Detected that the task is not local, preparing to download file......")
print(
"[*] Detected that the task is not local, preparing to download file......")
cache_path = os.path.join(cores.download_path, file_name)
thread = DownloadThreads(path, file_name, cache_path, types)
thread.start()

@ -11,8 +11,10 @@ import platform
import libs.core as cores
from queue import Queue
class iOSTask(object):
elf_file_name = ""
def __init__(self, path):
self.path = path
self.file_queue = Queue()
@ -28,7 +30,8 @@ class iOSTask(object):
elif self.__get_file_header__(file_path):
self.file_queue.put(file_path)
else:
raise Exception("Retrieval of this file type is not supported. Select IPA file or Mach-o file.")
raise Exception(
"Retrieval of this file type is not supported. Select IPA file or Mach-o file.")
return {"shell_flag": self.shell_flag, "file_queue": self.file_queue, "comp_list": [], "packagename": None, "file_identifier": self.file_identifier, "permissions": self.permissions}
def __get_file_header__(self, file_path):
@ -46,14 +49,15 @@ class iOSTask(object):
macho_file.close()
return False
def __shell_test__(self, macho_file, hex_hand):
while True:
magic = binascii.hexlify(macho_file.read(4)).decode().upper()
if magic == "2C000000":
macho_file.seek(hex_hand, 0)
encryption_info_command = binascii.hexlify(macho_file.read(24)).decode()
cryptid = encryption_info_command[-8:len(encryption_info_command)]
encryption_info_command = binascii.hexlify(
macho_file.read(24)).decode()
cryptid = encryption_info_command[-8:len(
encryption_info_command)]
if cryptid == "01000000":
self.shell_flag = True
break
@ -89,16 +93,21 @@ class iOSTask(object):
zip_file_names = zip_files.namelist()
zip_files.extract(zip_file_names[0], output_path)
try:
new_zip_file = zip_file_names[0].encode('cp437').decode('utf-8')
new_zip_file = zip_file_names[0].encode(
'cp437').decode('utf-8')
except UnicodeEncodeError:
new_zip_file = zip_file_names[0].encode('utf-8').decode('utf-8')
new_zip_file = zip_file_names[0].encode(
'utf-8').decode('utf-8')
old_zip_dir = self.__get_parse_dir__(output_path,zip_file_names[0])
old_zip_dir = self.__get_parse_dir__(
output_path, zip_file_names[0])
new_zip_dir = self.__get_parse_dir__(output_path, new_zip_file)
os.rename(old_zip_dir, new_zip_dir)
for zip_file in zip_file_names:
old_ext_path = zip_files.extract(zip_file, output_path)
if not "Payload" in old_ext_path:
continue
start = str(old_ext_path).index("Payload")
dir_path = old_ext_path[start:len(old_ext_path)]
old_ext_path = os.path.join(output_path, dir_path)
@ -117,14 +126,13 @@ class iOSTask(object):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
shutil.move(old_ext_path, new_ext_path)
if os.path.exists(old_ext_path):
if os.path.exists(old_ext_path) and (".app" in old_ext_path):
try:
# mac发生权限问题的时候做处理
os.remove(old_ext_path)
except Exception:
shutil.rmtree(old_ext_path)
def __get_parse_dir__(self, output_path, file_path):
start = file_path.index("Payload/")
end = file_path.index(".app")

@ -3,15 +3,13 @@
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import re
import xlwt
import socket
import openpyxl
import config
from queue import Queue
import libs.core as cores
from libs.core.net import NetThreads
import requests
class NetTask(object):
value_list = []
domain_list = []
@ -27,7 +25,7 @@ class NetTask(object):
def start(self):
xls_result_path = cores.xls_result_path
workbook = xlwt.Workbook(encoding = 'utf-8')
workbook = openpyxl.Workbook()
worksheet = self.__creating_excel_header__(workbook)
self.__write_result_to_txt__()
@ -40,20 +38,19 @@ class NetTask(object):
workbook.save(xls_result_path)
def __creating_excel_header__(self, workbook):
worksheet = workbook.add_sheet("Result",cell_overwrite_ok=True)
worksheet.write(0,0, label = "Number")
worksheet.write(0,1, label = "IP/URL")
worksheet.write(0,2, label = "Domain")
worksheet.write(0,3, label = "Status")
worksheet.write(0,4, label = "IP")
worksheet.write(0,5, label = "Server")
worksheet.write(0,6, label = "Title")
worksheet.write(0,7, label = "CDN")
# worksheet.write(0,8, label = "Finger")
worksheet = workbook.create_sheet("Result", 0)
worksheet.cell(row=1, column=1, value="Number")
worksheet.cell(row=1, column=2, value="IP/URL")
worksheet.cell(row=1, column=3, value="Domain")
worksheet.cell(row=1, column=4, value="Status")
worksheet.cell(row=1, column=5, value="IP")
worksheet.cell(row=1, column=6, value="Server")
worksheet.cell(row=1, column=7, value="Title")
worksheet.cell(row=1, column=8, value="CDN")
worksheet.cell(row=1, column=9, value="Finger")
return worksheet
def __write_result_to_txt__(self):
txt_result_path = cores.txt_result_path
append_file_flag = True
for key, value in self.result_dict.items():
@ -63,11 +60,11 @@ class NetTask(object):
self.value_list.append(result)
if (("http://" in result) or ("https://" in result)) and ("." in result):
domain = result.replace("https://","").replace("http://","")
if "{" in result or "}" in result or "[" in result or "]" in result or "\\" in result or "!" in result or "," in result:
continue
domain = result.replace(
"https://", "").replace("http://", "")
if "/" in domain:
domain = domain[:domain.index("/")]
@ -79,21 +76,25 @@ class NetTask(object):
url_suffix = result[result.rindex(".")+1:].lower()
if not(cores.resource_flag and url_suffix in config.sniffer_filter):
self.domain_queue.put({"domain":domain,"url_ip":result})
self.domain_queue.put(
{"domain": domain, "url_ip": result})
for identifier in self.file_identifier:
if identifier in self.app_history_list:
if not(domain in self.domain_history_list):
self.domain_list.append(domain)
self.__write_content_in_file__(cores.domain_history_path,domain)
self.__write_content_in_file__(
cores.domain_history_path, domain)
continue
if not(domain in self.domain_list):
self.domain_list.append(domain)
self.__write_content_in_file__(cores.domain_history_path,domain)
self.__write_content_in_file__(
cores.domain_history_path, domain)
if append_file_flag:
self.__write_content_in_file__(cores.app_history_path,identifier)
self.__write_content_in_file__(
cores.app_history_path, identifier)
append_file_flag = False
def __start_threads__(self, worksheet):
@ -107,4 +108,3 @@ class NetTask(object):
with open(file_path, "a+", encoding='utf-8', errors='ignore') as f:
f.write(content+"\r")
f.close()

@ -7,6 +7,7 @@ import config
import hashlib
from queue import Queue
class WebTask(object):
thread_list = []
value_list = []

@ -1,3 +1,3 @@
requests
click
xlwt
openpyxl

Binary file not shown.

@ -1,3 +1,10 @@
### V1.0.9
- 更新apktool为最新版本
- 优化部分环节流程
- 修复excle文件导出时超时行数限制
- 修复脚本执行时卡顿的问题
- 修复Mac下Playload文件权限不足的问题
### V1.0.8
- 添加AK和SK的检测
- 添加检测规则提交入口

Loading…
Cancel
Save