You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
flyapps/fir_ser/api/views/supersign.py

302 lines
14 KiB

5 years ago
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project: 3月
# author: liuyu
# date: 2020/3/4
import logging
from django.http.response import FileResponse
from rest_framework.pagination import PageNumberPagination
from rest_framework.response import Response
5 years ago
from rest_framework.views import APIView
5 years ago
from api.models import AppIOSDeveloperInfo, APPSuperSignUsedInfo, AppUDID
3 years ago
from api.utils.app.supersignutils import IosUtils
from api.utils.auth import ExpiringTokenAuthentication, SuperSignPermission
from api.utils.response import BaseResponse
from api.utils.serializer import DeveloperSerializer, SuperSignUsedSerializer, DeviceUDIDSerializer
from api.utils.utils import get_developer_devices, get_choices_dict
3 years ago
logger = logging.getLogger(__name__)
5 years ago
class PageNumber(PageNumberPagination):
5 years ago
page_size = 10 # 每页显示多少条
page_size_query_param = 'size' # URL中每页显示条数的参数
page_query_param = 'page' # URL中页码的参数
max_page_size = None # 最大页码数限制
class DeveloperView(APIView):
authentication_classes = [ExpiringTokenAuthentication, ]
permission_classes = [SuperSignPermission, ]
5 years ago
def get(self, request):
res = BaseResponse()
3 years ago
app_id = request.query_params.get("appid", None)
5 years ago
developer_obj = AppIOSDeveloperInfo.objects.filter(user_id=request.user)
5 years ago
res.use_num = get_developer_devices(developer_obj)
3 years ago
if app_id:
developer_obj = developer_obj.filter(issuer_id=app_id)
5 years ago
page_obj = PageNumber()
5 years ago
app_page_serializer = page_obj.paginate_queryset(queryset=developer_obj.order_by("-updated_time"),
request=request,
5 years ago
view=self)
developer_serializer = DeveloperSerializer(app_page_serializer, many=True, )
res.data = developer_serializer.data
5 years ago
res.count = developer_obj.count()
res.apple_auth_list = get_choices_dict(AppIOSDeveloperInfo.auth_type_choices)
5 years ago
return Response(res.dict)
5 years ago
def put(self, request):
data = request.data
issuer_id = data.get("issuer_id", None)
if issuer_id:
developer_obj = AppIOSDeveloperInfo.objects.filter(user_id=request.user, issuer_id=issuer_id).first()
else:
return self.get(request)
if developer_obj:
act = data.get("act", None)
if act:
res = BaseResponse()
3 years ago
logger.info(f"user {request.user} ios developer {developer_obj} act {act}")
if act == "checkauth":
status, result = IosUtils.active_developer(developer_obj)
if status:
5 years ago
if not developer_obj.certid:
IosUtils.get_device_from_developer(developer_obj)
return self.get(request)
else:
res.code = 1008
4 years ago
res.msg = result.get("return_info", "未知错误")
return Response(res.dict)
elif act == "ioscert":
if not developer_obj.certid:
status, result = IosUtils.create_developer_cert(developer_obj, request.user)
5 years ago
if status:
IosUtils.get_device_from_developer(developer_obj)
5 years ago
else:
res.code = 1008
res.msg = result.get("err_info")
return Response(res.dict)
elif act == "renewcert":
if developer_obj.certid:
# clean developer somethings. remove profile and revoke cert
IosUtils.clean_developer(developer_obj, request.user)
status, result = IosUtils.revoke_developer_cert(developer_obj, request.user)
if status:
pass
# status, result = IosUtils.create_developer_cert(developer_obj, request.user)
# if status:
# IosUtils.get_device_from_developer(developer_obj, request.user)
# else:
# res.code = 1008
# res.msg = result.get("err_info")
# return Response(res.dict)
else:
res.code = 1008
res.msg = result.get("err_info", '')
return Response(res.dict)
elif act == "syncdevice":
status, result = IosUtils.get_device_from_developer(developer_obj)
if not status:
res.code = 1008
res.msg = result.get("err_info")
return Response(res.dict)
else:
update_fields = []
3 years ago
logger.info(f"user {request.user} ios developer {developer_obj} update input data {data}")
logger.info(
f"user {request.user} ios developer {developer_obj} update old data {developer_obj.__dict__}")
try:
usable_number = int(data.get("usable_number", developer_obj.usable_number))
if 0 <= usable_number <= 100:
developer_obj.usable_number = usable_number
update_fields.append("usable_number")
except Exception as e:
3 years ago
logger.error(
f"developer {developer_obj} usable_number {data.get('usable_number', developer_obj.usable_number)} get failed Exception:{e}")
developer_obj.description = data.get("description", developer_obj.description)
update_fields.append("description")
private_key_id = data.get("private_key_id", developer_obj.private_key_id)
p8key = data.get("p8key", developer_obj.p8key)
if private_key_id != "" and private_key_id != developer_obj.private_key_id:
developer_obj.private_key_id = private_key_id
developer_obj.is_actived = False
update_fields.append("private_key_id")
if p8key != "" and p8key != developer_obj.p8key:
developer_obj.p8key = p8key
developer_obj.is_actived = False
update_fields.append("p8key")
try:
update_fields.append("is_actived")
developer_obj.save(update_fields=update_fields)
3 years ago
logger.info(
f"user {request.user} ios developer {developer_obj} update now data {developer_obj.__dict__}")
except Exception as e:
3 years ago
logger.error(
f"user {request.user} ios developer {developer_obj} update error data {data} Exception {e}")
return self.get(request)
5 years ago
def post(self, request):
data = request.data
3 years ago
data_info = {}
if data.get("auth_type") == 0:
3 years ago
data_info = {
"usable_number": data.get("usable_number", ""),
"description": data.get("description", ""),
"issuer_id": data.get("issuer_id", ""),
"private_key_id": data.get("private_key_id", ""),
"p8key": data.get("p8key", ""),
"auth_type": 0
}
try:
3 years ago
logger.error(f"user {request.user} add new developer {data.get('issuer_id', '')} data {data_info}")
developer_obj = AppIOSDeveloperInfo.objects.create(user_id=request.user, **data_info)
IosUtils.create_developer_space(developer_obj, request.user)
except Exception as e:
3 years ago
logger.error(f"user {request.user} create developer {data_info} failed Exception:{e}")
res = BaseResponse()
5 years ago
res.code = 1005
res.msg = "添加失败"
return Response(res.dict)
return self.get(request)
5 years ago
def delete(self, request):
issuer_id = request.query_params.get("issuer_id", None)
if issuer_id:
developer_obj = AppIOSDeveloperInfo.objects.filter(user_id=request.user, issuer_id=issuer_id).first()
else:
return self.get(request)
if developer_obj:
3 years ago
logger.error(f"user {request.user} delete developer {developer_obj}")
IosUtils.clean_developer(developer_obj, request.user)
IosUtils.revoke_developer_cert(developer_obj, request.user)
developer_obj.delete()
return self.get(request)
5 years ago
3 years ago
def base_super_sign_used_info(request):
udid = request.query_params.get("udid", None)
bundle_id = request.query_params.get("bundleid", None)
developer_id = request.query_params.get("appid", None)
3 years ago
mine = True
super_sign_used_objs = APPSuperSignUsedInfo.objects.filter(developerid__user_id=request.user, )
3 years ago
if not super_sign_used_objs:
3 years ago
super_sign_used_objs = APPSuperSignUsedInfo.objects.filter(user_id=request.user, )
mine = False
3 years ago
if developer_id:
super_sign_used_objs = super_sign_used_objs.filter(developerid__issuer_id=developer_id)
if udid:
3 years ago
super_sign_used_objs = super_sign_used_objs.filter(udid__udid__udid=udid)
3 years ago
if bundle_id:
super_sign_used_objs = super_sign_used_objs.filter(app_id__bundle_id=bundle_id)
return super_sign_used_objs, mine
5 years ago
class SuperSignUsedView(APIView):
authentication_classes = [ExpiringTokenAuthentication, ]
permission_classes = [SuperSignPermission, ]
5 years ago
def get(self, request):
res = BaseResponse()
3 years ago
super_sign_used_objs, mine = base_super_sign_used_info(request)
page_obj = PageNumber()
3 years ago
app_page_serializer = page_obj.paginate_queryset(queryset=super_sign_used_objs.order_by("-created_time"),
5 years ago
request=request,
5 years ago
view=self)
3 years ago
app_serializer = SuperSignUsedSerializer(app_page_serializer, many=True,
context={'mine': mine, 'user_obj': request.user})
5 years ago
res.data = app_serializer.data
3 years ago
res.count = super_sign_used_objs.count()
5 years ago
return Response(res.dict)
5 years ago
5 years ago
class AppUDIDUsedView(APIView):
authentication_classes = [ExpiringTokenAuthentication, ]
permission_classes = [SuperSignPermission, ]
5 years ago
def get(self, request):
res = BaseResponse()
3 years ago
super_sign_used_objs, mine = base_super_sign_used_info(request)
3 years ago
app_udid_objs = AppUDID.objects.filter(app_id__appsupersignusedinfo__in=super_sign_used_objs).distinct()
page_obj = PageNumber()
3 years ago
app_page_serializer = page_obj.paginate_queryset(queryset=app_udid_objs.order_by("-created_time"),
5 years ago
request=request,
5 years ago
view=self)
3 years ago
app_serializer = DeviceUDIDSerializer(app_page_serializer, many=True,
context={'mine': mine, 'user_obj': request.user})
5 years ago
res.data = app_serializer.data
3 years ago
res.count = app_udid_objs.count()
5 years ago
return Response(res.dict)
5 years ago
def delete(self, request):
res = BaseResponse()
3 years ago
pk = request.query_params.get("id", None)
5 years ago
app_id = request.query_params.get("aid", None)
3 years ago
app_udid_obj = AppUDID.objects.filter(pk=pk)
if app_udid_obj:
3 years ago
super_sign_used_obj = APPSuperSignUsedInfo.objects.filter(udid=app_udid_obj.first()).first()
if super_sign_used_obj and super_sign_used_obj.developerid.user_id.pk == request.user.pk:
logger.error(f"user {request.user} delete devices {app_udid_obj}")
IosUtils.disable_udid(app_udid_obj.first(), app_id)
app_udid_obj.delete()
else:
res.code = 10002
res.msg = '公共账号池不允许删除'
return Response(res.dict)
class SuperSignCertView(APIView):
authentication_classes = [ExpiringTokenAuthentication, ]
permission_classes = [SuperSignPermission, ]
def get(self, request):
issuer_id = request.query_params.get('issuer_id', None)
if issuer_id:
developer_obj = AppIOSDeveloperInfo.objects.filter(user_id=request.user, issuer_id=issuer_id).first()
# resign_app_obj = IosUtils.get_resign_obj(request.user, developer_obj)
# resign_app_obj.make_p12_from_cert(developer_obj.certid)
if developer_obj:
zip_file_path = IosUtils.zip_cert(request.user, developer_obj)
response = FileResponse(open(zip_file_path, 'rb'))
response['Content-Type'] = "application/octet-stream"
response["Access-Control-Expose-Headers"] = "Content-Disposition"
response['Content-Disposition'] = 'attachment; filename=' + developer_obj.certid + '.zip'
return response
res = BaseResponse()
return Response(res.dict)
def post(self, request):
res = BaseResponse()
issuer_id = request.data.get('issuer_id', None)
if issuer_id:
developer_obj = AppIOSDeveloperInfo.objects.filter(user_id=request.user, issuer_id=issuer_id).first()
if developer_obj:
resign_app_obj = IosUtils.get_resign_obj(request.user, developer_obj)
status, result = resign_app_obj.make_cert_from_p12(request.data.get('cert_pwd', ''),
request.data.get('cert_content', None))
if status:
3 years ago
status, result = IosUtils.auto_get_cert_id_by_p12(developer_obj, request.user)
if status:
resign_app_obj.write_cert()
else:
res.code = 1003
3 years ago
res.msg = '证书未在开发者账户找到,请检查推送证书是否属于该开发者'
else:
res.code = 1002
res.msg = str(result['err_info'])
return Response(res.dict)
return Response(res.dict)