You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
flyapps/fir_ser/api/utils/modelutils.py

170 lines
6.5 KiB

3 years ago
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project: 4月
# author: NinEveN
# date: 2021/4/16
import logging
import random
3 years ago
from urllib.parse import urljoin
from django.db.models import Count, Sum
from api.models import AppReleaseInfo, UserDomainInfo, DomainCnameInfo, UserAdDisplayInfo, RemoteClientInfo, \
AppIOSDeveloperInfo, IosDeveloperPublicPoolBill, APPToDeveloper
3 years ago
from api.utils.baseutils import get_server_domain_from_request, get_user_default_domain_name, get_real_ip_address, \
get_origin_domain_name
logger = logging.getLogger(__name__)
def get_app_d_count_by_app_id(app_id):
d_count = 1
binary_size = AppReleaseInfo.objects.filter(is_master=True, app_id__app_id=app_id).values('binary_size').first()
if binary_size and binary_size.get('binary_size', 0) > 0:
d_count += binary_size.get('binary_size') // 1024 // 1024 // 100
return d_count
def get_user_domain_name(obj, domain_type=1):
domain_name = UserDomainInfo.objects.filter(user_id=obj, is_enable=True, app_id=None,
domain_type=domain_type).values_list('domain_name').first()
if domain_name:
return domain_name[0]
return ''
def get_app_domain_name(obj):
domain_name = UserDomainInfo.objects.filter(app_id=obj, is_enable=True, domain_type=2).values_list(
'domain_name').first()
if domain_name:
return domain_name[0]
return ''
def get_min_default_domain_cname_obj(is_system=True):
return min(DomainCnameInfo.objects.annotate(Count('userinfo')).filter(is_enable=True, is_system=is_system),
key=lambda x: x.userinfo__count)
def get_filename_form_file(filename):
file_id_list = filename.split('.')
if file_id_list[-1] in ['ipa', 'apk']:
app_release_obj = AppReleaseInfo.objects.filter(release_id='.'.join(file_id_list[0:-1])).first()
if app_release_obj:
app_obj = app_release_obj.app_id
if app_obj.type == 0:
f_type = '.apk'
else:
f_type = '.ipa'
return f"{app_obj.name}-{app_release_obj.app_version}-{app_obj.short}{f_type}"
return filename
def ad_random_weight(user_obj):
ad_info_list = UserAdDisplayInfo.objects.filter(user_id=user_obj, is_enable=True).order_by('-created_time')
total = sum([ad_info.weight for ad_info in ad_info_list]) # 权重求和
ra = random.uniform(0, total) # 在0与权重和之前获取一个随机数
curr_sum = 0
ret = ad_info_list.first()
for ad_info in ad_info_list:
curr_sum += ad_info.weight # 在遍历中,累加当前权重值
if ra <= curr_sum: # 当随机数<=当前权重和时,返回权重key
ret = ad_info
break
return ret
def add_remote_info_from_request(request, description):
meta_info = request.META
3 years ago
if request.user and request.user.id is not None:
description = f"{request.user.uid}_{description}"
3 years ago
remote_info = {
'user_agent': meta_info.get('HTTP_USER_AGENT')[0:510],
3 years ago
'remote_addr': get_real_ip_address(request),
'method': meta_info.get('REQUEST_METHOD'),
'uri_info': urljoin(meta_info.get('PATH_INFO'), meta_info.get('QUERY_STRING'))[0:255],
'a_domain': get_origin_domain_name(request)[0:127],
3 years ago
'description': description[0:255]
3 years ago
}
RemoteClientInfo.objects.create(**remote_info)
def get_redirect_server_domain(request, user_obj=None, app_domain_name=None):
is_https = False
if user_obj:
if app_domain_name and len(app_domain_name) > 3:
domain_name = app_domain_name
else:
domain_name = get_user_domain_name(user_obj)
if not domain_name:
is_https, domain_name = get_user_default_domain_name(user_obj.default_domain_name)
elif app_domain_name and len(app_domain_name) > 3:
domain_name = app_domain_name
else:
is_https, domain_name = get_user_default_domain_name(get_min_default_domain_cname_obj(True))
protocol = 'http'
if is_https:
protocol = 'https'
server_domain = "%s://%s" % (protocol, domain_name)
return get_server_domain_from_request(request, server_domain)
def check_app_domain_name_access(app_obj, access_domain_name, user_obj, extra_domain=None):
if app_obj and access_domain_name:
domain_list = []
if extra_domain:
domain_list.append(extra_domain)
app_domain_name = get_app_domain_name(app_obj)
if app_domain_name:
domain_list.append(app_domain_name)
3 years ago
user_domain_name = get_user_domain_name(user_obj)
if user_domain_name:
domain_list.append(user_domain_name)
3 years ago
user_qr_domain_name = get_user_domain_name(user_obj, 0)
if user_qr_domain_name:
domain_list.append(user_qr_domain_name)
3 years ago
if access_domain_name in domain_list:
return True
def get_ios_developer_public_num(user_obj):
add_number = IosDeveloperPublicPoolBill.objects.filter(to_user_id=user_obj, action__in=[1, 2]).aggregate(
number=Sum('number'))
used_number = IosDeveloperPublicPoolBill.objects.filter(user_id=user_obj, action=0,
udid_sync_info__isnull=False).values('number',
'udid_sync_info_id').annotate(
counts=Count('udid_sync_info_id')).aggregate(number=Sum('number'))
add_number = add_number.get("number", 0)
if not add_number:
add_number = -99
used_number = used_number.get("number", 0)
if not used_number:
used_number = 0
return add_number - used_number
def check_super_sign_permission(user_obj):
if not user_obj.supersign_active:
return False
developer_count = AppIOSDeveloperInfo.objects.filter(user_id=user_obj).count()
if developer_count == 0 and get_ios_developer_public_num(user_obj) < 0:
return False
return True
def check_ipa_is_latest_sign(app_obj, developer_obj=None):
release_obj = AppReleaseInfo.objects.filter(app_id=app_obj, is_master=True).first()
all_app_to_dev = APPToDeveloper.objects.filter(app_id=app_obj)
if developer_obj:
all_app_to_dev = all_app_to_dev.filter(developerid=developer_obj)
all_app_to_dev = all_app_to_dev.all()
t_count = 0
for apptodev_obj in all_app_to_dev:
if release_obj.release_id == apptodev_obj.release_file:
t_count += 1
if t_count == all_app_to_dev.count():
return True