You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AppInfoScanner/libs/core/parses.py

102 lines
3.3 KiB

4 years ago
# -*- coding: utf-8 -*-
# Author: kelvinBen
# Github: https://github.com/kelvinBen/AppInfoScanner
import threading
import config
import re
import os
import libs.core as cores
class ParsesThreads(threading.Thread):
def __init__(self,threadID,name,file_queue,all,result_dict):
threading.Thread.__init__(self)
self.file_queue = file_queue
self.name = name
self.threadID = threadID
self.result_list = []
self.all = all
self.result_dict=result_dict
def __regular_parse__(self,threadLock):
while True:
try:
file_path = self.file_queue.get(timeout = 5)
scan_str = ("Scan file : %s" % file_path)
print(scan_str)
try:
os.path.basename(file_path).split(".")[1]
except Exception as e:
self.__get_string__(file_path,threadLock)
continue
self.__file_parse__(file_path,threadLock)
result_set = set(self.result_list)
if len(result_set) !=0:
self.result_dict[file_path] = result_set
if self.file_queue.empty():
break
except Exception as e:
break
def __file_parse__(self,file_path,threadLock):
with open(file_path,"r",encoding="utf8") as file :
file_content = file.read()
# 获取到所有的字符串
pattern = re.compile(r'\"(.*?)\"')
results = pattern.findall(file_content)
# 遍历所有的字符串
for result in set(results):
self.__parse_string__(result,threadLock)
def __get_string__(self,dir_file_path,threadLock):
temp = os.path.join(cores.output_path,"temp.txt")
cmd_str = ("%s %s > %s") % (cores.strings_path,dir_file_path,temp)
if os.system(cmd_str) == 0:
with open(temp,"r") as f:
lines = f.readlines()
for line in lines:
self.__parse_string__(line,threadLock)
def __parse_string__(self,result,threadLock):
# 通过正则筛选需要过滤的字符串
for filter_str in config.filter_strs:
filter_str_pat = re.compile(filter_str)
filter_resl = filter_str_pat.findall(result)
# print(filter_resl)
# 过滤掉未搜索到的内容
if len(filter_resl)!=0:
# 提取第一个字符
resl_str = filter_resl[0]
# 过滤
if self.__filter__(resl_str) == 0:
continue
threadLock.acquire()
self.result_list.append(filter_resl[0])
threadLock.release()
continue
def __filter__(self,resl_str):
resl_str = resl_str.replace("\r","").replace("\n","").replace(" ","")
if len(resl_str) == 0:
return 0
for filte in config.filter_no:
resl_str = resl_str.replace(filte,"")
if len(resl_str) == 0:
return 0
if re.match(filte,resl_str):
return 0
return 1
def run(self):
threadLock = threading.Lock()
self.__regular_parse__(threadLock)