- 新增AI智能分析快速过滤第三方URL地址(目前存在一些问题等待后期修复)

- 新增Domain\Title\CDN\Finger识别功能
- 优化Excel文件内容输出格式
- 优化URL和IP筛选规则
- 优化全局日志输出
- 修复APK下多DEX文件搜索结果不准确的问题
v1.0.6
kelvin_ben 4 years ago committed by Your Name
parent fa37dda0ce
commit cd7bb3a364
  1. 43
      config.py
  2. 12
      libs/core/__init__.py
  3. 86
      libs/core/net.py
  4. 13
      libs/core/parses.py
  5. 33
      libs/task/android_task.py
  6. 100
      libs/task/base_task.py
  7. 11
      libs/task/ios_task.py
  8. 135
      libs/task/net_task.py
  9. 27
      libs/task/web_task.py
  10. 8
      update.md

@ -26,26 +26,38 @@ filter_components = [
# 此处目前支持过滤 # 此处目前支持过滤
# 1. https://以及http://开头的 # 1. https://以及http://开头的
# 2. IPv4的ip地址 # 2. IPv4的ip地址
# 3. URI地址 # 3. URI地址,URI不能很好的拼接所以此处忽略
filter_strs =[ filter_strs =[
r'^http://.*', r'https://.*|http://.*',
r'^https://.*', r'.*://([[0-9]{1,3}\.]{3}[0-9]{1,3}).*',
r'.*(http://.*)',
r'.*(https://.*)',
r'.*((?:[0-9]{1,3}\.){3}[0-9]{1,3}).*',
# r'/[a-z0-9A-Z]+/.*' # r'/[a-z0-9A-Z]+/.*'
] ]
# 过滤无用的内容, # 此处忽略常见的域名等信息
filter_no = [ filter_no = [
u'127.0.0.1', # r'.*127.0.0.1',
u'0.0.0.0', # r'.*0.0.0.0',
u'localhost', # r'.*localhost',
r"^http://www.w3.org" # r'.*w3.org',
r"L.*/", # r'.*apache.org',
r"/.*;", # r'.*android.com',
r"/.*<", # r'.*weixin.qq.com',
r'^http://schemas.android.com', # r'.*jpush.cn',
# r'.*umengcloud.com',
# r'.*umeng.com',
# r'.*baidu.com',
# r'.*apple.com',
# r'.*alibaba.com',
# r'.*qq.com',
# r'.*sohu.com',
# r'.*openssl.org',
# r'.*weibo.com',
# r'.*wechat.com',
# r'.*.amap.com',
# r'.*openxmlformats.org',
# r'.*github.com',
# r'.*w3school.com.cn',
# r'.*google.com'
] ]
# 此处配置壳信息 # 此处配置壳信息
@ -79,3 +91,4 @@ web_file_suffix =[
"aspx", "aspx",
"py" "py"
] ]

@ -34,6 +34,10 @@ class Bootstrapper(object):
global txt_result_path global txt_result_path
global xls_result_path global xls_result_path
global strings_path global strings_path
global history_path
global app_history_path
global domain_history_path
global excel_row
create_time = time.strftime("%Y%m%d%H%M%S", time.localtime()) create_time = time.strftime("%Y%m%d%H%M%S", time.localtime())
@ -51,17 +55,25 @@ class Bootstrapper(object):
else: else:
strings_path ="strings" strings_path ="strings"
excel_row = 0
backsmali_path = os.path.join(tools_dir,"baksmali.jar") backsmali_path = os.path.join(tools_dir,"baksmali.jar")
apktool_path = os.path.join(tools_dir, "apktool.jar") apktool_path = os.path.join(tools_dir, "apktool.jar")
output_path = os.path.join(script_root_dir,"out") output_path = os.path.join(script_root_dir,"out")
history_path = os.path.join(script_root_dir,"history")
txt_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".txt") txt_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".txt")
xls_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".xls") xls_result_path = os.path.join(script_root_dir,"result_"+str(create_time)+".xls")
app_history_path = os.path.join(history_path,"app_history.txt")
domain_history_path = os.path.join(history_path,"domain_history.txt")
# 包名信息一致的情况不记录URL信息 ,不一致的时候记录URL信息
def init(self): def init(self):
if os.path.exists(output_path): if os.path.exists(output_path):
shutil.rmtree(output_path) shutil.rmtree(output_path)
os.makedirs(output_path) os.makedirs(output_path)
if not os.path.exists(history_path):
os.makedirs(history_path)
if os.path.exists(txt_result_path): if os.path.exists(txt_result_path):
os.remove(txt_result_path) os.remove(txt_result_path)

@ -0,0 +1,86 @@
import re
import time
import threading
import requests
import libs.core as cores
class NetThreads(threading.Thread):
def __init__(self,threadID,name,domain_queue,worksheet):
threading.Thread.__init__(self)
self.name = name
self.threadID = threadID
self.lock = threading.Lock()
self.domain_queue = domain_queue
self.worksheet = worksheet
def __get_Http_info__(self,threadLock):
while True:
domains = self.domain_queue.get(timeout=5)
domain = domains["domain"]
url_ip = domains["url_ip"]
time.sleep(2)
result = self.__get_request_result__(url_ip)
if result != "error":
if self.lock.acquire(True):
cores.excel_row = cores.excel_row + 1
self.worksheet.write(cores.excel_row, 0, label = cores.excel_row)
self.worksheet.write(cores.excel_row, 1, label = url_ip)
self.worksheet.write(cores.excel_row, 2, label = domain)
if result != "timeout":
self.worksheet.write(cores.excel_row, 3, label = result["status"])
self.worksheet.write(cores.excel_row, 4, label = result["des_ip"])
self.worksheet.write(cores.excel_row, 5, label = result["server"])
self.worksheet.write(cores.excel_row, 6, label = result["title"])
self.worksheet.write(cores.excel_row, 7, label = result["cdn"])
self.worksheet.write(cores.excel_row, 8, label = "")
self.lock.release()
if self.domain_queue.empty():
break
def __get_request_result__(self,url):
result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""}
cdn = ""
try:
# python3 app.py ios -i C:\Users\Administrator\Desktop\frida-ipa-dump\三晋通\Payload\三晋通.app\三晋通
#
with requests.get(url, timeout=5,stream=True) as rsp:
status_code = rsp.status_code
result["status"] = status_code
headers = rsp.headers
if "Server" in headers:
result["server"] = headers['Server']
if "Cookie" in headers:
result["cookie"] = headers['Cookie']
if "X-Via" in headers:
cdn = cdn + headers['X-Via']
if "Via" in headers:
cdn = cdn + headers['Via']
result["cdn"] = cdn
sock = rsp.raw._connection.sock
if sock:
des_ip = sock.getpeername()[0]
sou_ip = sock.getsockname()[0]
if des_ip:
result["des_ip"] = des_ip
if sou_ip:
result["sou_ip"] = sou_ip
sock.close()
html = rsp.text
title = re.findall('<title>(.+)</title>',html)
if title:
result["title"] = title[0]
rsp.close()
return result
except requests.exceptions.InvalidURL as e:
return "error"
except requests.exceptions.ConnectionError as e1:
return "timeout"
except requests.exceptions.ReadTimeout as e2:
return "timeout"
def run(self):
threadLock = threading.Lock()
self.__get_Http_info__(threadLock)

@ -81,16 +81,21 @@ class ParsesThreads(threading.Thread):
def __filter__(self,resl_str): def __filter__(self,resl_str):
return_flag = 1 return_flag = 1
print(resl_str)
resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","") resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","")
if len(resl_str) == 0: if len(resl_str) == 0:
return 0 return 0
# 单独处理https或者http开头的字符串 # 目前流通的域名中加上协议头最短长度为11位
http_list =["https","https://","https:","http","http://","https:",] if len(resl_str) <= 10:
for filte in http_list:
if filte == resl_str:
return 0 return 0
# 单独处理https或者http开头的字符串
# http_list =["https","https://","https:","http","http://","https:",]
# for filte in http_list:
# if filte == resl_str:
# return 0
for filte in config.filter_no: for filte in config.filter_no:
resl_str = resl_str.replace(filte,"") resl_str = resl_str.replace(filte,"")
if len(resl_str) == 0: if len(resl_str) == 0:

@ -1,15 +1,13 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Author: kelvinBen # Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner # Github: https://github.com/kelvinBen/AppInfoScanner
import os import os
import re import re
import config import config
import threading import hashlib
from queue import Queue from queue import Queue
import libs.core as cores import libs.core as cores
from libs.core.parses import ParsesThreads
class AndroidTask(object): class AndroidTask(object):
@ -22,6 +20,7 @@ class AndroidTask(object):
self.shell_flag=False self.shell_flag=False
self.packagename="" self.packagename=""
self.comp_list=[] self.comp_list=[]
self.file_identifier=[]
def start(self): def start(self):
# 检查java环境是否存在 # 检查java环境是否存在
@ -36,7 +35,7 @@ class AndroidTask(object):
if self.__decode_file__(input_file_path) == "error": if self.__decode_file__(input_file_path) == "error":
raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.") raise Exception("Retrieval of this file type is not supported. Select APK file or DEX file.")
return {"comp_list":self.comp_list,"shell_flag":self.shell_flag,"file_queue":self.file_queue,"packagename":self.packagename} return {"comp_list":self.comp_list,"shell_flag":self.shell_flag,"file_queue":self.file_queue,"packagename":self.packagename,"file_identifier":self.file_identifier}
def __decode_file__(self,file_path): def __decode_file__(self,file_path):
apktool_path = str(cores.apktool_path) apktool_path = str(cores.apktool_path)
@ -46,6 +45,9 @@ class AndroidTask(object):
if suffix_name == "apk": if suffix_name == "apk":
self.__decode_apk__(file_path,apktool_path,output_path) self.__decode_apk__(file_path,apktool_path,output_path)
elif suffix_name == "dex": elif suffix_name == "dex":
with open(file_path,'rb') as f:
dex_md5 = str(hashlib.md5().update(f.read()).hexdigest()).upper()
self.file_identifier.append(dex_md5)
self.__decode_dex__(file_path,backsmali_path,output_path) self.__decode_dex__(file_path,backsmali_path,output_path)
else: else:
return "error" return "error"
@ -83,17 +85,17 @@ class AndroidTask(object):
# 初始化检测文件信息 # 初始化检测文件信息
def __scanner_file_by_apktool__(self,output_path): def __scanner_file_by_apktool__(self,output_path):
file_names = os.listdir(output_path)
for file_name in file_names:
file_path = os.path.join(output_path,file_name)
if not os.path.isdir(file_path):
continue
if "smali" in file_name or "assets" in file_name:
scanner_file_suffixs = ["smali","js","xml"]
if self.no_resource: if self.no_resource:
scanner_dir_lists = ["smali"]
scanner_file_suffixs =["smali"] scanner_file_suffixs =["smali"]
else: self.__get_scanner_file__(file_path,scanner_file_suffixs)
scanner_dir_lists = ["smali","assets"]
scanner_file_suffixs = ["smali","js","xml"]
for scanner_dir_list in scanner_dir_lists:
scanner_dir = os.path.join(output_path,scanner_dir_list)
if os.path.exists(scanner_dir):
self.__get_scanner_file__(scanner_dir,scanner_file_suffixs)
def __get_scanner_file__(self,scanner_dir,scanner_file_suffixs=["smali"]): def __get_scanner_file__(self,scanner_dir,scanner_file_suffixs=["smali"]):
dir_or_files = os.listdir(scanner_dir) dir_or_files = os.listdir(scanner_dir)
@ -115,13 +117,14 @@ class AndroidTask(object):
def __shell_test__(self,output): def __shell_test__(self,output):
am_path = os.path.join(output,"AndroidManifest.xml") am_path = os.path.join(output,"AndroidManifest.xml")
with open(am_path,"r") as f: with open(am_path,"r",encoding='utf-8',errors='ignore') as f:
am_str = f.read() am_str = f.read()
am_package= re.compile(r'<manifest.*package=\"(.*?)\".*') am_package= re.compile(r'<manifest.*package=\"(.*?)\".*')
apackage = am_package.findall(am_str) apackage = am_package.findall(am_str)
if len(apackage) >=1: if len(apackage) >=1:
self.packagename = apackage[0] self.packagename = apackage[0]
self.file_identifier.append(apackage[0])
am_name = re.compile(r'<application.*android:name=\"(.*?)\".*>') am_name = re.compile(r'<application.*android:name=\"(.*?)\".*>')
aname = am_name.findall(am_str) aname = am_name.findall(am_str)

@ -1,11 +1,8 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Author: kelvinBen # Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner # Github: https://github.com/kelvinBen/AppInfoScanner
# 接收传入的参数信息,根据参数进行平台分发
import os import os
import re import re
import xlwt
import config import config
import threading import threading
from queue import Queue from queue import Queue
@ -14,11 +11,12 @@ from libs.core.parses import ParsesThreads
from libs.task.android_task import AndroidTask from libs.task.android_task import AndroidTask
from libs.task.ios_task import iOSTask from libs.task.ios_task import iOSTask
from libs.task.web_task import WebTask from libs.task.web_task import WebTask
from libs.task.net_task import NetTask
class BaseTask(object): class BaseTask(object):
thread_list =[] thread_list =[]
result_dict = {} result_dict = {}
value_list = [] app_history_list=[]
# 统一初始化入口 # 统一初始化入口
def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10): def __init__(self, types="Android", inputs="", rules="", net_sniffer=False, no_resource=False, package="", all_str=False, threads=10):
@ -33,12 +31,16 @@ class BaseTask(object):
self.threads = threads self.threads = threads
self.file_queue = Queue() self.file_queue = Queue()
# 统一调度平台 # 统一调度平台
def start(self): def start(self):
workbook = xlwt.Workbook(encoding = 'utf-8')
# 创建excel头 print("[*] AI决策系统正在分析规则中...")
worksheet = self.__creating_excel_header__(workbook)
# 获取历史记录
self.__history_handle__()
print("[*] 本次的过滤规则为:" , config.filter_no)
# 任务控制中心 # 任务控制中心
task_info = self.__tast_control__() task_info = self.__tast_control__()
@ -46,6 +48,7 @@ class BaseTask(object):
shell_flag = task_info["shell_flag"] shell_flag = task_info["shell_flag"]
comp_list = task_info["comp_list"] comp_list = task_info["comp_list"]
packagename = task_info["packagename"] packagename = task_info["packagename"]
file_identifier = task_info["file_identifier"]
if shell_flag: if shell_flag:
print('\033[3;31m Error: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!') print('\033[3;31m Error: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!')
@ -59,12 +62,8 @@ class BaseTask(object):
thread.join() thread.join()
# 结果输出中心 # 结果输出中心
self.__print_control__(packagename,comp_list,workbook,worksheet) self.__print_control__(packagename,comp_list,file_identifier)
def __creating_excel_header__(self,workbook):
worksheet = workbook.add_sheet("扫描信息",cell_overwrite_ok=True)
worksheet.write(0,0, label = "扫描结果")
return worksheet
def __tast_control__(self): def __tast_control__(self):
task_info = {} task_info = {}
@ -86,10 +85,12 @@ class BaseTask(object):
thread.start() thread.start()
self.thread_list.append(thread) self.thread_list.append(thread)
def __print_control__(self,packagename,comp_list,workbook,worksheet): def __print_control__(self,packagename,comp_list,file_identifier):
txt_result_path = cores.txt_result_path txt_result_path = cores.txt_result_path
xls_result_path = cores.xls_result_path xls_result_path = cores.xls_result_path
# 此处需要hash值或者应用名称, apk文件获取pachage, dex文件获取hash, macho-o获取文件名
if packagename: if packagename:
print("========= The package name of this APP is: ===============") print("========= The package name of this APP is: ===============")
print(packagename) print(packagename)
@ -98,22 +99,63 @@ class BaseTask(object):
print("========= Component information is as follows :===============") print("========= Component information is as follows :===============")
for json in comp_list: for json in comp_list:
print(json) print(json)
print("=========The result set for the static scan is shown below:===============") print("=========The result set for the static scan is shown below:===============")
with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f:
row = 1 NetTask(self.result_dict,self.app_history_list,file_identifier,self.threads).start()
for key,value in self.result_dict.items():
f.write(key+"\r") # with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f:
for result in value: # row = 1
if result in self.value_list: # for key,value in self.result_dict.items():
continue # f.write(key+"\r")
self.value_list.append(result)
print(result) # for result in value:
worksheet.write(row,0, label = result) # if result in self.value_list:
row = row + 1 # continue
f.write("\t"+result+"\r") # if not(file_identifier in self.app_history_list) and ("http://" in result or "https://" in result):
print("For more information about the search, see TXT file result: %s" %(txt_result_path)) # domain = result.replace("https://","").replace("http://","")
print("For more information about the search, see XLS file result: %s" %(xls_result_path)) # if "/" in domain:
workbook.save(xls_result_path) # domain = domain[:domain.index("/")]
# if not(domain in self.domain_list):
# self.domain_list.append(domain)
# self.__write_content_in_file__(cores.domain_history_path,domain)
# if append_file_flag:
# for identifier in file_identifier:
# self.__write_content_in_file__(cores.app_history_path,identifier)
# append_file_flag = False
# self.value_list.append(result)
# worksheet.write(row,0, label = result)
# row = row + 1
# f.write("\t"+result+"\r")
print("For more information about the search, see TXT file result: %s" %(cores.txt_result_path))
print("For more information about the search, see XLS file result: %s" %(cores.xls_result_path))
def __history_handle__(self):
domain_history_path = cores.domain_history_path
app_history_path = cores.app_history_path
if os.path.exists(domain_history_path):
domain_counts = {}
app_size = 0
with open(app_history_path,"r",encoding='utf-8',errors='ignore') as f:
lines = f.readlines()
app_size = len(lines)
for line in lines:
self.app_history_list.append(line.replace("\r","").replace("\n",""))
f.close()
with open(domain_history_path,"r",encoding='utf-8',errors='ignore') as f:
lines = f.readlines()
cout = 3
if (app_size>3) and (app_size%3==0):
cout = cout + 1
for line in lines:
domain = line.replace("\r","").replace("\n","")
domain_count = lines.count(line)
if domain_count >= cout:
config.filter_no.append(domain)
f.close()

@ -1,18 +1,14 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Author: kelvinBen # Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner # Github: https://github.com/kelvinBen/AppInfoScanner
import os import os
import re import re
import shutil import shutil
import zipfile import zipfile
import binascii import binascii
import platform import platform
from pathlib import Path
import libs.core as cores import libs.core as cores
from queue import Queue from queue import Queue
from libs.core.parses import ParsesThreads
class iOSTask(object): class iOSTask(object):
@ -23,6 +19,7 @@ class iOSTask(object):
self.file_queue = Queue() self.file_queue = Queue()
self.shell_flag = False self.shell_flag = False
self.file_identifier= []
def start(self): def start(self):
file_path = self.path file_path = self.path
@ -33,10 +30,12 @@ class iOSTask(object):
self.file_queue.put(file_path) self.file_queue.put(file_path)
else: else:
raise Exception("Retrieval of this file type is not supported. Select IPA file or Mach-o file.") raise Exception("Retrieval of this file type is not supported. Select IPA file or Mach-o file.")
return {"shell_flag":self.shell_flag,"file_queue":self.file_queue,"comp_list":[],"packagename":None} return {"shell_flag":self.shell_flag,"file_queue":self.file_queue,"comp_list":[],"packagename":None,"file_identifier":self.file_identifier}
def __get_file_header__(self,file_path): def __get_file_header__(self,file_path):
hex_hand = 0x0 hex_hand = 0x0
macho_name = os.path.split(file_path)[-1]
self.file_identifier.append(macho_name)
with open(file_path,"rb") as macho_file: with open(file_path,"rb") as macho_file:
macho_file.seek(hex_hand,0) macho_file.seek(hex_hand,0)
magic = binascii.hexlify(macho_file.read(4)).decode().upper() magic = binascii.hexlify(macho_file.read(4)).decode().upper()
@ -56,7 +55,6 @@ class iOSTask(object):
macho_file.seek(hex_hand,0) macho_file.seek(hex_hand,0)
encryption_info_command = binascii.hexlify(macho_file.read(24)).decode() encryption_info_command = binascii.hexlify(macho_file.read(24)).decode()
cryptid = encryption_info_command[-8:len(encryption_info_command)] cryptid = encryption_info_command[-8:len(encryption_info_command)]
print(cryptid)
if cryptid == "01000000": if cryptid == "01000000":
self.shell_flag = True self.shell_flag = True
break break
@ -78,7 +76,6 @@ class iOSTask(object):
else: else:
if self.elf_file_name == dir_file: if self.elf_file_name == dir_file:
self.__get_file_header__(dir_file_path) self.__get_file_header__(dir_file_path)
print(self.shell_flag)
self.file_queue.put(dir_file_path) self.file_queue.put(dir_file_path)
continue continue
if self.no_resource: if self.no_resource:

@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import re
import xlwt
import socket
from queue import Queue
import libs.core as cores
from libs.core.net import NetThreads
import requests
class NetTask(object):
value_list = []
domain_list=[]
def __init__(self,result_dict,app_history_list,file_identifier,threads):
self.result_dict = result_dict
self.app_history_list = app_history_list
self.file_identifier = file_identifier
self.domain_queue = Queue()
self.threads = threads
self.thread_list = []
def start(self):
xls_result_path = cores.xls_result_path
workbook = xlwt.Workbook(encoding = 'utf-8')
worksheet = self.__creating_excel_header__(workbook)
self.__start_threads__(worksheet)
self.__write_result_to_txt__()
for thread in self.thread_list:
thread.join()
workbook.save(xls_result_path)
def __creating_excel_header__(self,workbook):
worksheet = workbook.add_sheet("Result",cell_overwrite_ok=True)
worksheet.write(0,0, label = "Number")
worksheet.write(0,1, label = "IP/URL")
worksheet.write(0,2, label = "Domain")
worksheet.write(0,3, label = "Status")
worksheet.write(0,4, label = "IP")
worksheet.write(0,5, label = "Server")
worksheet.write(0,6, label = "Title")
worksheet.write(0,7, label = "CDN")
worksheet.write(0,8, label = "Finger")
return worksheet
def __write_result_to_txt__(self):
txt_result_path = cores.txt_result_path
append_file_flag = True
with open(txt_result_path,"a+",encoding='utf-8',errors='ignore') as f:
for key,value in self.result_dict.items():
f.write(key+"\r")
for result in value:
if result in self.value_list:
continue
# 100个文件标识
for file in self.file_identifier:
if not(file in self.app_history_list) and ("http://" in result or "https://" in result):
# print(self.file_identifier,self.app_history_list,not(self.file_identifier[0] in self.app_history_list))
# if not(self.file_identifier in self.app_history_list) and ("http://" in result or "https://" in result):
domain = result.replace("https://","").replace("http://","")
if "/" in domain:
domain = domain[:domain.index("/")]
self.domain_queue.put({"domain":domain,"url_ip":result})
print(domain,self.domain_list,not(domain in self.domain_list))
if not(domain in self.domain_list):
self.domain_list.append(domain)
self.__write_content_in_file__(cores.domain_history_path,domain)
if append_file_flag:
for identifier in self.file_identifier:
if self.file_identifier in self.app_history_list:
continue
self.__write_content_in_file__(cores.app_history_path,identifier)
append_file_flag = False
self.value_list.append(result)
f.write("\t"+result+"\r")
f.close()
def __start_threads__(self,worksheet):
for threadID in range(0,self.threads) :
name = "Thread - " + str(threadID)
thread = NetThreads(threadID,name,self.domain_queue,worksheet)
thread.start()
self.thread_list.append(thread)
def __write_content_in_file__(self,file_path,content):
with open(file_path,"a+",encoding='utf-8',errors='ignore') as f:
f.write(content+"\r")
f.close()
def __get_request_result__(url):
result={"status":"","server":"","cookie":"","cdn":"","des_ip":"","sou_ip":"","title":""}
cdn = ""
try:
rsp = requests.get(url, timeout=5,stream=True)
status_code = rsp.status_code
result["status"] = status_code
headers = rsp.headers
if "Server" in headers:
result["server"] = headers['Server']
if "Cookie" in headers:
result["cookie"] = headers['Cookie']
if "X-Via" in headers:
cdn = cdn + headers['X-Via']
if "Via" in headers:
cdn = cdn + headers['Via']
result["cdn"] = cdn
sock = rsp.raw._connection.sock
if sock:
des_ip = sock.getpeername()[0]
sou_ip = sock.getsockname()[0]
if des_ip:
result["des_ip"] = des_ip
if sou_ip:
result["sou_ip"] = sou_ip
html = rsp.text
title = re.findall('<title>(.+)</title>',html)
result["title"] = title
return result
except requests.exceptions.InvalidURL as e:
return "error"
except requests.exceptions.ConnectionError as e1:
return "timeout"
# print(__get_request_result__("http://download.sxzwfw.gov.cn/getMerchantSign"))

@ -1,16 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Author: kelvinBen # Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner # Github: https://github.com/kelvinBen/AppInfoScanner
import os import os
import re
import config import config
import threading
from queue import Queue from queue import Queue
import libs.core as cores
from libs.core.parses import ParsesThreads
class WebTask(object): class WebTask(object):
thread_list =[] thread_list =[]
@ -45,23 +38,3 @@ class WebTask(object):
if len(dir_file.split("."))>1: if len(dir_file.split("."))>1:
if dir_file.split(".")[-1] in file_suffix: if dir_file.split(".")[-1] in file_suffix:
self.file_queue.put(dir_file_path) self.file_queue.put(dir_file_path)
# def __print__(self):
# print("=========The result set for the static scan is shown below:===============")
# with open(cores.result_path,"a+") as f:
# for key,value in self.result_dict.items():
# f.write(key+"\r")
# for result in value:
# if result in self.value_list:
# continue
# self.value_list.append(result)
# print(result)
# f.write("\t"+result+"\r")
# print("For more information about the search, see: %s" %(cores.result_path))
# def __start_threads(self):
# for threadID in range(1,self.threads) :
# name = "Thread - " + str(threadID)
# thread = ParsesThreads(threadID,name,self.file_queue,self.all,self.result_dict)
# thread.start()
# self.thread_list.append(thread)

@ -1,3 +1,11 @@
### V1.0.6
- 新增AI智能分析快速过滤第三方URL地址
- 新增Domain\Title\CDN\Finger识别功能
- 优化Excel文件内容输出格式
- 优化URL和IP筛选规则
- 优化全局日志输出
- 修复APK下多DEX文件搜索结果不准确的问题
### V1.0.5 ### V1.0.5
- 新增对DOM、SAX、DOM4J、JDOM等XML解析组件的识别 - 新增对DOM、SAX、DOM4J、JDOM等XML解析组件的识别
- 新增反编译失败后提交issues入口 - 新增反编译失败后提交issues入口

Loading…
Cancel
Save