You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
567 lines
26 KiB
567 lines
26 KiB
#!/usr/bin/env python
|
|
# -*- coding:utf-8 -*-
|
|
# project: 4月
|
|
# author: liuyu
|
|
# date: 2020/4/24
|
|
import base64
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import time
|
|
from subprocess import Popen, PIPE
|
|
|
|
from OpenSSL.crypto import (load_pkcs12, dump_certificate_request, dump_privatekey, PKey, TYPE_RSA, X509Req,
|
|
dump_certificate, load_privatekey, load_certificate, PKCS12, FILETYPE_PEM, FILETYPE_ASN1)
|
|
|
|
from common.base.baseutils import get_format_time, format_apple_date, make_app_uuid
|
|
from common.cache.state import CleanErrorBundleIdSignDataState
|
|
from common.core.sysconfig import Config
|
|
from common.libs.apple.appleapiv3 import AppStoreConnectApi, Certificates, Devices, BundleIds, Profiles
|
|
from fir_ser.settings import SUPER_SIGN_ROOT
|
|
from xsign.models import AppIOSDeveloperInfo
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def shell_command(command, timeout):
|
|
result = {'exit_code': 99, 'return_info': ''}
|
|
shell_start_time = time.time()
|
|
child = Popen(command, shell=True, stdout=PIPE, stderr=PIPE)
|
|
if timeout:
|
|
while child.poll() is None:
|
|
time.sleep(1)
|
|
now = time.time()
|
|
if int(now - shell_start_time) > timeout:
|
|
os.kill(child.pid, signal.SIGKILL)
|
|
os.waitpid(-1, os.WNOHANG)
|
|
result['exit_code'] = 126
|
|
return result
|
|
|
|
out, err = child.communicate()
|
|
if err:
|
|
result['err_info'] = err.decode("utf-8")
|
|
shell_end_time = time.time()
|
|
result['shell_run_time'] = shell_end_time - shell_start_time
|
|
out = out.strip(b'\n')
|
|
result['return_info'] = out
|
|
result['exit_code'] = child.returncode
|
|
logger.info(f'shell: {command} - return_info:{out} - exit_code:{child.returncode}')
|
|
return result
|
|
|
|
|
|
def exec_shell(cmd, timeout=None):
|
|
logger.info(f"exec_shell cmd:{cmd}")
|
|
result = shell_command(cmd, timeout)
|
|
logger.info(f"exec_shell cmd:{cmd} result:{result}")
|
|
if result.get("exit_code") != 0:
|
|
err_info = result.get("err_info", None)
|
|
if err_info:
|
|
logger.error(f"exec_shell cmd:{cmd} failed: {err_info}")
|
|
result["err_info"] = "Unknown Error"
|
|
return False, result
|
|
return True, result
|
|
|
|
|
|
class ResignApp(object):
|
|
|
|
def __init__(self, my_local_key, app_dev_pem, app_dev_p12):
|
|
self.my_local_key = my_local_key
|
|
self.app_dev_pem = app_dev_pem
|
|
self.app_dev_p12 = app_dev_p12
|
|
self.cmd = "zsign -c '%s' -k '%s' " % (self.app_dev_pem, self.my_local_key)
|
|
|
|
@staticmethod
|
|
def sign_mobile_config(sign_data, ssl_pem_path, ssl_key_path):
|
|
"""
|
|
:param sign_data: 签名的数据
|
|
:param ssl_pem_path: pem证书的绝对路径
|
|
:param ssl_key_path: key证书的绝对路径
|
|
:return:
|
|
"""
|
|
#
|
|
# cmd = "openssl smime -sign -in %s -out %s -signer %s " \
|
|
# "-inkey %s -certfile %s -outform der -nodetach " % (
|
|
# mobile_config_path, sign_mobile_config_path, ssl_pem_path, ssl_key_path, ssl_pem_path)
|
|
# return exec_shell(cmd)
|
|
result = {}
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.serialization import pkcs7
|
|
from cryptography import x509
|
|
try:
|
|
cert_list = re.findall('-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
|
|
open(ssl_pem_path, 'r').read(), re.S)
|
|
if len(cert_list) == 0:
|
|
raise Exception('load cert failed')
|
|
else:
|
|
cert = x509.load_pem_x509_certificate(cert_list[0].encode('utf-8'))
|
|
cas = [cert]
|
|
if len(cert_list) > 1:
|
|
cas.extend([x509.load_pem_x509_certificate(x.encode('utf-8')) for x in cert_list[1:]])
|
|
key = serialization.load_pem_private_key(open(ssl_key_path, 'rb').read(), None)
|
|
result['data'] = pkcs7.PKCS7SignatureBuilder(
|
|
data=sign_data.encode('utf-8'),
|
|
signers=[
|
|
(cert, key, hashes.SHA512()),
|
|
],
|
|
additional_certs=cas,
|
|
).sign(
|
|
serialization.Encoding.DER, options=[],
|
|
)
|
|
except Exception as e:
|
|
result['err_info'] = str(e)
|
|
return False, result
|
|
return True, result
|
|
|
|
def make_p12_from_cert(self, password):
|
|
result = {}
|
|
try:
|
|
certificate = load_certificate(FILETYPE_PEM, open(self.app_dev_pem, 'rb').read())
|
|
private_key = load_privatekey(FILETYPE_PEM, open(self.my_local_key, 'rb').read())
|
|
p12 = PKCS12()
|
|
p12.set_certificate(certificate)
|
|
p12.set_privatekey(private_key)
|
|
with open(self.app_dev_p12, 'wb+') as f:
|
|
f.write(p12.export(password))
|
|
if password:
|
|
with open(self.app_dev_p12 + '.pwd', 'w') as f:
|
|
f.write(password)
|
|
return True, p12.get_friendlyname()
|
|
except Exception as e:
|
|
result["err_info"] = e
|
|
return False, result
|
|
|
|
def check_p12_exists(self):
|
|
return os.path.exists(self.app_dev_p12)
|
|
|
|
def write_cert(self):
|
|
for file in [self.app_dev_p12, self.app_dev_p12 + '.pwd', self.my_local_key, self.app_dev_pem]:
|
|
if os.path.exists(file):
|
|
os.rename(file, file + '.' + get_format_time() + '.bak')
|
|
os.rename(file + '.bak', file)
|
|
|
|
def make_cert_from_p12(self, password, p12_content=None):
|
|
result = {}
|
|
try:
|
|
if p12_content:
|
|
p12_content_list = p12_content.split('data:application/x-pkcs12;base64,')
|
|
if len(p12_content_list) == 2:
|
|
with open(self.app_dev_p12 + '.bak', 'wb+') as f:
|
|
f.write(base64.b64decode(p12_content.split('data:application/x-pkcs12;base64,')[1]))
|
|
if password:
|
|
with open(self.app_dev_p12 + '.pwd.bak', 'w') as f:
|
|
f.write(password)
|
|
else:
|
|
result["err_info"] = '非法p12证书文件,请检查'
|
|
return False, result
|
|
else:
|
|
result["err_info"] = '证书内容有误,请检查'
|
|
return False, result
|
|
p12 = load_pkcs12(open(self.app_dev_p12 + '.bak', 'rb').read(), password)
|
|
cert = p12.get_certificate()
|
|
if cert.has_expired():
|
|
result["err_info"] = '证书已经过期'
|
|
return False, result
|
|
with open(self.my_local_key + '.bak', 'wb+') as f:
|
|
f.write(dump_privatekey(FILETYPE_PEM, p12.get_privatekey()))
|
|
with open(self.app_dev_pem + '.bak', 'wb+') as f:
|
|
f.write(dump_certificate(FILETYPE_PEM, cert))
|
|
return True, cert.get_version()
|
|
except Exception as e:
|
|
for file in [self.app_dev_p12, self.app_dev_p12 + '.pwd', self.my_local_key, self.app_dev_pem]:
|
|
if os.path.exists(file + '.bak'):
|
|
os.remove(file + '.bak')
|
|
result["err_info"] = str(e)
|
|
if 'mac verify failure' in str(e):
|
|
result["err_info"] = 'p12 导入密码错误,请检查'
|
|
return False, result
|
|
|
|
def sign(self, new_profile, org_ipa, new_ipa, info_plist_properties=None):
|
|
if info_plist_properties is None:
|
|
info_plist_properties = {}
|
|
properties = ""
|
|
for k, v in info_plist_properties.items():
|
|
properties += " %s '%s' " % (k, v)
|
|
self.cmd = self.cmd + " %s -m '%s' -o '%s' -z 9 '%s'" % (properties, new_profile, new_ipa, org_ipa)
|
|
return exec_shell(self.cmd)
|
|
|
|
|
|
def make_csr_content(csr_file_path, private_key_path):
|
|
# create public/private key
|
|
key = PKey()
|
|
key.generate_key(TYPE_RSA, 2048)
|
|
# Generate CSR
|
|
req = X509Req()
|
|
req.get_subject().CN = 'FLY APP'
|
|
req.get_subject().O = 'FLY APP Inc'
|
|
req.get_subject().OU = 'IT'
|
|
req.get_subject().L = 'BJ'
|
|
req.get_subject().ST = 'BJ'
|
|
req.get_subject().C = 'CN'
|
|
req.get_subject().emailAddress = 'flyapps@126.com'
|
|
req.set_pubkey(key)
|
|
req.sign(key, 'sha256')
|
|
csr_content = dump_certificate_request(FILETYPE_PEM, req)
|
|
with open(csr_file_path, 'wb+') as f:
|
|
f.write(csr_content)
|
|
with open(private_key_path, 'wb+') as f:
|
|
f.write(dump_privatekey(FILETYPE_PEM, key))
|
|
|
|
return csr_content
|
|
|
|
|
|
def make_pem(cer_content, pem_path):
|
|
cert = load_certificate(FILETYPE_ASN1, cer_content)
|
|
with open(pem_path, 'wb+') as f:
|
|
f.write(dump_certificate(FILETYPE_PEM, cert))
|
|
|
|
|
|
def check_error_call_back(error, developer_pk):
|
|
msg = ''
|
|
status = None
|
|
if 'Cannot connect to proxy' in error or 'Read timed out' in error or 'Max retries exceeded with' in error:
|
|
logger.error('access apple api failed . change proxy ip again')
|
|
msg = "代理网络错误,请稍后重试或联系管理员处理"
|
|
if 'it may be encrypted with an unsupported algorithm' in error:
|
|
msg = "数据校验失败,请检查p8key内容是否正常"
|
|
status = 5
|
|
if 'Authentication credentials are missing or invalid' in error:
|
|
msg = '认证失败,请检查开发者信息填写是否正确'
|
|
status = 5
|
|
if 'FORBIDDEN.REQUIRED_AGREEMENTS_MISSING_OR_EXPIRED' in error:
|
|
msg = '请登录 https://developer.apple.com/account/ 并同意最新协议'
|
|
status = 2
|
|
if status is not None:
|
|
developer_obj = AppIOSDeveloperInfo.objects.filter(pk=developer_pk).first()
|
|
if developer_obj:
|
|
if developer_obj.status == -1 and status == 5:
|
|
status = -1
|
|
AppIOSDeveloperInfo.objects.filter(pk=developer_pk).update(status=status)
|
|
logger.error(f"{msg} {error}")
|
|
return msg if msg else error
|
|
|
|
|
|
class AppDeveloperApiV2(object):
|
|
def __init__(self, issuer_id, private_key_id, p8key, cert_id, developer_pk):
|
|
self.issuer_id = issuer_id
|
|
self.private_key_id = private_key_id
|
|
self.p8key = p8key
|
|
self.cert_id = cert_id
|
|
self.developer_pk = developer_pk
|
|
|
|
def __getattribute__(self, name):
|
|
attr = object.__getattribute__(self, name)
|
|
if hasattr(attr, '__call__'):
|
|
def func(*args, **kwargs):
|
|
if attr.__name__ in ['active', 'get_device']:
|
|
return attr(*args, **kwargs)
|
|
else:
|
|
if AppIOSDeveloperInfo.objects.filter(pk=self.developer_pk,
|
|
status__in=Config.DEVELOPER_WRITE_STATUS).first():
|
|
start_time = time.time()
|
|
logger.info(f'issuer_id:{self.issuer_id} calling {attr.__name__} time:{start_time}')
|
|
result = attr(*args, **kwargs)
|
|
logger.info(
|
|
f'issuer_id:{self.issuer_id} done {attr.__name__} used time:{time.time() - start_time}')
|
|
return result
|
|
else:
|
|
result = False, {'return_info': '开发者状态异常'}
|
|
logger.warning(f'issuer_id:{self.issuer_id} can not calling {attr.__name__} {result}')
|
|
return result
|
|
|
|
return func
|
|
else:
|
|
return attr
|
|
|
|
def __file_format_path_name(self, user_obj):
|
|
cert_dir_name = make_app_uuid(user_obj, self.issuer_id)
|
|
cert_dir_path = os.path.join(SUPER_SIGN_ROOT, cert_dir_name)
|
|
if not os.path.isdir(cert_dir_path):
|
|
os.makedirs(cert_dir_path)
|
|
return os.path.join(cert_dir_path, cert_dir_name)
|
|
|
|
def __result_format(self, result, is_instance):
|
|
return_flag = False
|
|
if isinstance(result, list):
|
|
if len(result) == 0:
|
|
return_flag = True
|
|
else:
|
|
if isinstance(result[0], is_instance):
|
|
return_flag = True
|
|
else:
|
|
if isinstance(result, is_instance):
|
|
result = [result]
|
|
return_flag = True
|
|
if return_flag:
|
|
logger.info(f"issuer_id:{self.issuer_id} {is_instance} result:{result}")
|
|
return True, result
|
|
raise Exception(f'{result} is not {is_instance}')
|
|
|
|
def active(self):
|
|
"""
|
|
:return: 结果为空列表,或者是 object 或者是 [object,object] 其他为 false
|
|
"""
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
certificates = apple_obj.get_all_certificates()
|
|
return self.__result_format(certificates, Certificates)
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer active Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def create_cert(self, user_obj):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
csr_path = self.__file_format_path_name(user_obj)
|
|
if not os.path.isdir(os.path.dirname(csr_path)):
|
|
os.makedirs(os.path.dirname(csr_path))
|
|
csr_content = make_csr_content(csr_path + ".csr", csr_path + ".key")
|
|
certificates = apple_obj.create_certificate(csr_content.decode("utf-8"))
|
|
if certificates and isinstance(certificates, Certificates):
|
|
n = base64.b64decode(certificates.certificateContent)
|
|
with open(csr_path + ".cer", 'wb') as f:
|
|
f.write(n)
|
|
make_pem(n, csr_path + ".pem")
|
|
logger.info(
|
|
f"issuer_id:{self.issuer_id} ios developer create cert result:{certificates.certificateContent}")
|
|
return True, certificates
|
|
raise Exception(str(certificates))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer create cert Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def get_cert_obj_by_cid(self):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
cert_obj = apple_obj.get_certificate_by_cid(self.cert_id)
|
|
if cert_obj and isinstance(cert_obj, Certificates):
|
|
return True, cert_obj
|
|
else:
|
|
logger.info(f"issuer_id:{self.issuer_id} ios developer get cert {self.cert_id} failed")
|
|
return False, result
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer get cert {self.cert_id} Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def revoke_cert(self):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
cert_obj = apple_obj.get_certificate_by_cid(self.cert_id)
|
|
if cert_obj and isinstance(cert_obj, Certificates):
|
|
s_date = format_apple_date(cert_obj.expirationDate)
|
|
if s_date.timestamp() - datetime.datetime.now().timestamp() < 3600 * 24 * 3:
|
|
if apple_obj.revoke_certificate(self.cert_id):
|
|
logger.info(f"issuer_id:{self.issuer_id} ios developer cert {self.cert_id} revoke")
|
|
return True, result
|
|
else:
|
|
logger.info(
|
|
f"issuer_id:{self.issuer_id} ios developer cert {self.cert_id} not revoke.because expire time < 3 day ")
|
|
return True, result
|
|
raise Exception(str(cert_obj))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer cert {self.cert_id} revoke Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def auto_set_certid_by_p12(self, app_dev_pem):
|
|
result = {}
|
|
try:
|
|
cer = load_certificate(FILETYPE_PEM, open(app_dev_pem, 'rb').read())
|
|
not_after = datetime.datetime.strptime(cer.get_notAfter().decode('utf-8'), "%Y%m%d%H%M%SZ")
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
certificates = apple_obj.get_all_certificates()
|
|
status, result = self.__result_format(certificates, Certificates)
|
|
if status:
|
|
for cert_obj in result:
|
|
f_date = format_apple_date(cert_obj.expirationDate)
|
|
logger.info(
|
|
f"issuer_id:{self.issuer_id} {cert_obj.id}-{not_after.timestamp()} - {f_date.timestamp()} ")
|
|
if not_after.timestamp() == f_date.timestamp():
|
|
return True, cert_obj
|
|
raise Exception(str(certificates))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer cert {app_dev_pem} auto get Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def del_profile(self, profile_id, profile_name):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
if apple_obj.delete_profile_by_id(profile_id, profile_name):
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer delete profile Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def set_device_status(self, status, device_id, device_name, device_udid, failed_call_prefix,
|
|
device_err_callback=None):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
if status == "enable":
|
|
device_obj = apple_obj.enabled_device(device_id, device_name, device_udid)
|
|
else:
|
|
device_obj = apple_obj.disabled_device(device_id, device_name, device_udid)
|
|
logger.info(f"issuer_id:{self.issuer_id} device_obj:{device_obj} result:{status}")
|
|
if device_obj and isinstance(device_obj, Devices):
|
|
return True, result
|
|
raise Exception(str(device_obj))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer set devices status Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
if device_err_callback and ("There are no current ios devices" in str(e) or "Device obj is None" in str(e)):
|
|
with CleanErrorBundleIdSignDataState(failed_call_prefix) as state:
|
|
if state:
|
|
device_err_callback()
|
|
else:
|
|
logger.warning(
|
|
f'issuer_id:{self.issuer_id} {device_err_callback}-{failed_call_prefix} is running')
|
|
return False, result
|
|
|
|
def get_device(self):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
devices_obj_list = apple_obj.get_all_devices()
|
|
return self.__result_format(devices_obj_list, Devices)
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer get device Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def del_app(self, identifier_id, bundle_id, app_id):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
if apple_obj.delete_bundle_by_identifier(identifier_id, f"{bundle_id}.{self.issuer_id}.{app_id}"):
|
|
return True, result
|
|
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer delete app Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|
|
def create_app(self, bundle_id, app_id, s_type, app_id_err_callback=None):
|
|
if app_id_err_callback is None:
|
|
app_id_err_callback = []
|
|
result = {}
|
|
try:
|
|
result = {}
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
if s_type == 0:
|
|
bundle_obj = apple_obj.register_bundle_id(app_id, f"{bundle_id}.{self.issuer_id}.{app_id}")
|
|
else:
|
|
bundle_obj = apple_obj.register_bundle_id_enable_capability(app_id,
|
|
f"{bundle_id}.{self.issuer_id}.{app_id}",
|
|
s_type)
|
|
if bundle_obj and isinstance(bundle_obj, BundleIds):
|
|
result['aid'] = bundle_obj.id
|
|
return True, result
|
|
raise Exception(str(bundle_obj))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer create app Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
if app_id_err_callback and "There is no App ID with ID" in str(e):
|
|
for call_fun in app_id_err_callback:
|
|
call_fun()
|
|
return False, result
|
|
|
|
def register_device(self, device_udid, device_name, failed_call_prefix, device_err_callback=None):
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
device_obj = apple_obj.register_device(device_name, device_udid)
|
|
if device_obj and isinstance(device_obj, Devices):
|
|
return True, device_obj
|
|
raise Exception(str(device_obj))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer register device Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
err_msg_list = [
|
|
"There are no current ios devices",
|
|
"Your development team has reached the maximum number of registered iPhone devices"
|
|
]
|
|
if device_err_callback:
|
|
for err_msg in err_msg_list:
|
|
if err_msg in str(e):
|
|
with CleanErrorBundleIdSignDataState(failed_call_prefix) as state:
|
|
if state:
|
|
device_err_callback()
|
|
else:
|
|
logger.warning(
|
|
f'issuer_id:{self.issuer_id} {device_err_callback}-{failed_call_prefix} is running')
|
|
break
|
|
return False, result
|
|
|
|
def make_and_download_profile(self, app_obj, provision_name, auth, developer_app_id, device_id_list, profile_id,
|
|
failed_call_prefix, app_id_err_callback=None):
|
|
if app_id_err_callback is None:
|
|
app_id_err_callback = []
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
profile_obj = apple_obj.create_profile(profile_id, developer_app_id, auth.get('cert_id'),
|
|
provision_name.split("/")[-1],
|
|
device_id_list)
|
|
if profile_obj and isinstance(profile_obj, Profiles):
|
|
result['profile_id'] = profile_obj.id
|
|
n = base64.b64decode(profile_obj.profileContent)
|
|
if not os.path.isdir(os.path.dirname(provision_name)):
|
|
os.makedirs(os.path.dirname(provision_name))
|
|
with open(provision_name, 'wb') as f:
|
|
f.write(n)
|
|
return True, result
|
|
raise Exception(str(profile_obj))
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} app_id {app_obj.app_id} ios developer make profile Failed "
|
|
f"Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
if app_id_err_callback and "There is no App ID with ID" in str(e):
|
|
with CleanErrorBundleIdSignDataState(failed_call_prefix) as state:
|
|
if state:
|
|
for call_fun in app_id_err_callback:
|
|
call_fun()
|
|
else:
|
|
logger.warning(
|
|
f'issuer_id:{self.issuer_id} {app_id_err_callback}-{failed_call_prefix} is running')
|
|
return False, result
|
|
|
|
def modify_capability(self, app_obj, developer_app_id):
|
|
bundle_id = app_obj.bundle_id
|
|
app_id = app_obj.app_id
|
|
s_type = app_obj.supersign_type
|
|
result = {}
|
|
try:
|
|
apple_obj = AppStoreConnectApi(self.issuer_id, self.private_key_id, self.p8key)
|
|
if developer_app_id:
|
|
if s_type == 0:
|
|
result['code'] = apple_obj.disable_capability_by_s_type(developer_app_id)
|
|
else:
|
|
result['code'] = apple_obj.enable_capability_by_s_type(developer_app_id, s_type)
|
|
else:
|
|
if s_type == 0:
|
|
bundle_obj = apple_obj.register_bundle_id(app_id, f"{bundle_id}.{self.issuer_id}.{app_id}")
|
|
else:
|
|
bundle_obj = apple_obj.register_bundle_id_enable_capability(app_id,
|
|
f"{bundle_id}.{self.issuer_id}.{app_id}",
|
|
s_type)
|
|
if bundle_obj and isinstance(bundle_obj, BundleIds):
|
|
developer_app_id = bundle_obj.id
|
|
result['aid'] = developer_app_id
|
|
else:
|
|
raise Exception(str(bundle_obj))
|
|
return True, result
|
|
except Exception as e:
|
|
logger.error(f"issuer_id:{self.issuer_id} ios developer modify_capability Failed Exception:{e}")
|
|
result['return_info'] = check_error_call_back(str(e), self.developer_pk)
|
|
return False, result
|
|
|