You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AppInfoScanner/libs/task/base_task.py

176 lines
6.3 KiB

#! /usr/bin/python3
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import os
import config
from queue import Queue
import libs.core as cores
from libs.task.ios_task import iOSTask
from libs.task.web_task import WebTask
from libs.task.net_task import NetTask
from libs.core.parses import ParsesThreads
from libs.task.android_task import AndroidTask
from libs.task.download_task import DownloadTask
class BaseTask(object):
thread_list = []
result_dict = {}
app_history_list = []
domain_history_list = []
# 统一初始化入口
def __init__(self, types="Android", inputs="", rules="", sniffer=True, threads=10, package=""):
self.types = types
self.path = inputs
if rules:
config.filter_strs.append(r'.*'+str(rules)+'.*')
self.sniffer = not sniffer
self.threads = threads
self.package = package
self.file_queue = Queue()
# 统一调度平台
def start(self):
print("[*] AI is analyzing filtering rules......")
# 获取历史记录
self.__history_handle__()
print("[*] The filtering rules obtained by AI are as follows: %s" %
(set(config.filter_no)))
# 任务控制中心
task_info = self.__tast_control__()
if len(task_info) < 1:
return
file_queue = task_info["file_queue"]
shell_flag = task_info["shell_flag"]
comp_list = task_info["comp_list"]
packagename = task_info["packagename"]
file_identifier = task_info["file_identifier"]
permissions = task_info["permissions"]
if shell_flag:
print('[-] \033[3;31m Error: This application has shell, the retrieval results may not be accurate, Please remove the shell and try again!')
return
# 线程控制中心
print(
"[*] ========= Searching for strings that match the rules ===============")
self.__threads_control__(file_queue)
# 等待线程结束
for thread in self.thread_list:
thread.join()
# 结果输出中心
self.__print_control__(packagename, comp_list,
file_identifier, permissions)
def __tast_control__(self):
task_info = {}
# 自动根据文件后缀名称进行修正
cache_info = DownloadTask().start(self.path, self.types)
cacar_path = cache_info["path"]
types = cache_info["type"]
if (not os.path.exists(cacar_path) and cores.download_flag):
print(
"[-] File download failed! Please download the file manually and try again.")
return task_info
# 调用Android 相关处理逻辑
if types == "Android":
task_info = AndroidTask(cacar_path, self.package).start()
# 调用iOS 相关处理逻辑
elif types == "iOS":
task_info = iOSTask(cacar_path).start()
# 调用Web 相关处理逻辑
else:
task_info = WebTask(cacar_path).start()
return task_info
def __threads_control__(self, file_queue):
for threadID in range(1, self.threads):
name = "Thread - " + str(int(threadID))
thread = ParsesThreads(
threadID, name, file_queue, self.result_dict, self.types)
thread.start()
self.thread_list.append(thread)
def __print_control__(self, packagename, comp_list, file_identifier, permissions):
txt_result_path = cores.txt_result_path
xls_result_path = cores.xls_result_path
all_flag = cores.all_flag
if self.sniffer:
print(
"[*] ========= Sniffing the URL address of the search ===============")
NetTask(self.result_dict, self.app_history_list,
self.domain_history_list, file_identifier, self.threads).start()
if packagename:
print("[*] ========= The package name of this APP is: ===============")
print(packagename)
if len(comp_list) != 0:
print("[*] ========= Component information is as follows: ===============")
for json in comp_list:
print(json)
if len(permissions) != 0:
print(
"[*] ========= Sensitive permission information is as follows: ===============")
for permission in permissions:
print(permission)
if all_flag:
value_list = []
with open(txt_result_path, "a+", encoding='utf-8', errors='ignore') as f:
for key, value in self.result_dict.items():
f.write(key+"\r")
for result in value:
if result in value_list:
continue
value_list.append(result)
f.write("\t"+result+"\r")
f.close()
print("[*] For more information about the search, see TXT file result: %s" %
(txt_result_path))
if self.sniffer:
print("[*] For more information about the search, see XLSX file result: %s" %
(xls_result_path))
def __history_handle__(self):
domain_history_path = cores.domain_history_path
app_history_path = cores.app_history_path
if os.path.exists(domain_history_path):
domain_counts = {}
app_size = 0
with open(app_history_path, "r", encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
app_size = len(lines)
for line in lines:
self.app_history_list.append(
line.replace("\r", "").replace("\n", ""))
f.close()
with open(domain_history_path, "r", encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
cout = 3
if (app_size > 3) and (app_size % 3 == 0):
cout = cout + 1
for line in lines:
domain = line.replace("\r", "").replace("\n", "")
self.domain_history_list.append(domain)
domain_count = lines.count(line)
if domain_count >= cout:
config.filter_no.append(".*" + domain)
f.close()