#!/usr/bin/env python # -*- coding:utf-8 -*- # project: 4月 # author: NinEveN # date: 2020/4/17 # pip install pyjwt import base64 import datetime import logging import os import time from collections import namedtuple from functools import wraps import jwt import requests from django.conf import settings logger = logging.getLogger(__name__) # https://developer.apple.com/documentation/appstoreconnectapi/creating_api_keys_for_app_store_connect_api # https://appstoreconnect.apple.com/access/api 去申请秘钥 # proxies = settings.APPLE_DEVELOPER_API_PROXY if settings.APPLE_DEVELOPER_API_PROXY else {} timeout = settings.APPLE_DEVELOPER_API_TIMEOUT if settings.APPLE_DEVELOPER_API_TIMEOUT else 120 def request_format_log(req): try: logger.info(f"url:{req.url} header:{req.headers} code:{req.status_code} body:{req.content}") except Exception as e: logger.error(e) return req # 需要和model里面的对应起来 capability_info = [ [], ["PUSH_NOTIFICATIONS"], [ "PERSONAL_VPN", "PUSH_NOTIFICATIONS", "NETWORK_EXTENSIONS", ], [ "PERSONAL_VPN", "PUSH_NOTIFICATIONS", "NETWORK_EXTENSIONS", "WALLET", "ICLOUD", "INTER_APP_AUDIO", "ASSOCIATED_DOMAINS", "APP_GROUPS", "HEALTHKIT", "HOMEKIT", "WIRELESS_ACCESSORY_CONFIGURATION", "APPLE_PAY", "DATA_PROTECTION", "SIRIKIT", "MULTIPATH", "HOT_SPOT", "NFC_TAG_READING", "CLASSKIT", "AUTOFILL_CREDENTIAL_PROVIDER", "ACCESS_WIFI_INFORMATION", "COREMEDIA_HLS_LOW_LATENCY", ] ] def get_capability(s_type): return capability_info[s_type] class DevicesAPI(object): # https://developer.apple.com/documentation/appstoreconnectapi/devices def __init__(self, base_uri, jwt_headers): self.headers = jwt_headers self.devices_url = '%s/devices' % base_uri def list_devices(self, query_parameters=None): """ :param query_parameters: :return: 200 DevicesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid", "filter[platform]": "IOS", "limit": 200 } if query_parameters: for k, v in query_parameters.items(): params[k] = v return request_format_log( requests.get(self.devices_url, params=params, headers=self.headers, proxies=proxies, timeout=timeout)) def list_enabled_devices(self): return self.list_devices({"filter[status]": "ENABLED"}) def list_disabled_devices(self): return self.list_devices({"filter[status]": "DISABLED"}) def list_device_by_device_id(self, device_id): return self.list_devices({"filter[id]": device_id}) def register_device(self, device_name, device_udid, platform="IOS"): """ :param device_name: :param device_udid: :param platform: :return: 201 DeviceResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'devices', 'attributes': { 'name': device_name, 'udid': device_udid, 'platform': platform # IOS or MAC_OS } } } return request_format_log( requests.post(self.devices_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def read_device_information(self, device_id): """ :param device_id: :return: 200 DeviceResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json """ base_url = '%s/%s' % (self.devices_url, device_id) params = { "fields[devices]": "addedDate, deviceClass, model, name, platform, status, udid", } return request_format_log( requests.get(base_url, params=params, headers=self.headers, proxies=proxies, timeout=timeout)) def enabled_device(self, device_id, device_name): return self.modify_registered_device(device_id, device_name, 'ENABLED') def disabled_device(self, device_id, device_name): return self.modify_registered_device(device_id, device_name, 'DISABLED') def modify_registered_device(self, device_id, device_name, status): """ :param device_id: :param device_name: :param status: :return: 200 DeviceResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (self.devices_url, device_id) json = { 'data': { 'type': 'devices', 'id': device_id, 'attributes': { 'name': device_name, 'status': status } } } return request_format_log( requests.patch(base_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) class BundleIDsAPI(object): # https://developer.apple.com/documentation/appstoreconnectapi/bundle_ids def __init__(self, base_uri, jwt_headers): self.headers = jwt_headers self.bundle_ids_url = '%s/bundleIds' % base_uri def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): """ :param bundle_id_name: :param bundle_id_identifier: :param platform: :param seed_id: :return: 201 BundleIdResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'bundleIds', 'attributes': { 'name': bundle_id_name, 'identifier': bundle_id_identifier, 'platform': platform, 'seedId': seed_id } } } return request_format_log( requests.post(self.bundle_ids_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def delete_bundle_id_by_id(self, bundle_id): """ :param bundle_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (self.bundle_ids_url, bundle_id) json = {} return request_format_log( requests.delete(base_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def list_bundle_ids(self, query_parameters=None): """ :param query_parameters: :return: 200 BundleIdsResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[bundleIds]": "identifier, name, platform, profiles, seedId", # "filter[platform]": "IOS", "limit": 200 } if query_parameters: for k, v in query_parameters.items(): params[k] = v return request_format_log( requests.get(self.bundle_ids_url, params=params, headers=self.headers, proxies=proxies, timeout=timeout)) def list_bundle_id_by_identifier(self, identifier): return self.list_bundle_ids({"filter[identifier]": identifier}) def list_bundle_id_by_id(self, bundle_id): return self.list_bundle_ids({"filter[id]": bundle_id}) def modify_bundle_id(self, bundle_id, bundle_name): """ :param bundle_id: :param bundle_name: :return: 200 BundleIdResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (self.bundle_ids_url, bundle_id) json = { 'data': { 'type': 'bundleIds', 'id': bundle_id, 'attributes': { 'name': bundle_name, } } } return request_format_log( requests.patch(base_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) class BundleIDsCapabilityAPI(object): # https://developer.apple.com/documentation/appstoreconnectapi/bundle_id_capabilities def __init__(self, base_uri, jwt_headers): self.headers = jwt_headers self.bundle_ids_capability_url = '%s/bundleIdCapabilities' % base_uri def disable_capability(self, bundle_id, capability_type): """ :param capability_type: :param bundle_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s_%s' % (self.bundle_ids_capability_url, bundle_id, capability_type) json = {} return request_format_log( requests.delete(base_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def enable_capability(self, bundle_id, capability_type): """ :param bundle_id: :param capability_type: :return: 201 BundleIdCapabilityResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'bundleIdCapabilities', 'attributes': { 'capabilityType': capability_type, # 'PUSH_NOTIFICATIONS',#PERSONAL_VPN 'settings': [] }, 'relationships': { 'bundleId': { 'data': { 'id': bundle_id, 'type': 'bundleIds', } } } } } return request_format_log( requests.post(self.bundle_ids_capability_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) class ProfilesAPI(object): # https://developer.apple.com/documentation/appstoreconnectapi/profiles def __init__(self, base_uri, jwt_headers): self.headers = jwt_headers self.profiles_url = '%s/profiles' % base_uri def create_profile(self, bundle_id, certificate_id_list, profile_name, device_id_list, profile_type='IOS_APP_ADHOC'): """ :param bundle_id: :param certificate_id_list: :param profile_name: :param device_id_list: :param profile_type: :return: 201 ProfileResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'profiles', 'attributes': { 'name': profile_name, 'profileType': profile_type, # Possible values: IOS_APP_DEVELOPMENT, IOS_APP_STORE, IOS_APP_ADHOC, IOS_APP_INHOUSE, # MAC_APP_DEVELOPMENT, MAC_APP_STORE, MAC_APP_DIRECT, TVOS_APP_DEVELOPMENT, TVOS_APP_STORE, # TVOS_APP_ADHOC, TVOS_APP_INHOUSE, MAC_CATALYST_APP_DEVELOPMENT, MAC_CATALYST_APP_STORE, # MAC_CATALYST_APP_DIRECT }, 'relationships': { 'bundleId': { 'data': {'id': bundle_id, 'type': 'bundleIds'} }, 'certificates': { 'data': [ {'id': certificate_id, 'type': 'certificates'} for certificate_id in certificate_id_list ] }, 'devices': { 'data': [ {'id': device_id, 'type': 'devices'} for device_id in device_id_list ] }, } } } return request_format_log( requests.post(self.profiles_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def delete_profile(self, profile_id): """ :param profile_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (self.profiles_url, profile_id) json = {} return request_format_log( requests.delete(base_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def download_profile(self, profile_id): # n=base64.b64decode(profileContent) # with open('profilea','wb') as f: # f.write(n) # print(n) pass def list_profiles(self, query_parameters=None): """ :param query_parameters: :return: 200 ProfilesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "limit": 200 } if query_parameters: for k, v in query_parameters.items(): params[k] = v return request_format_log( requests.get(self.profiles_url, params=params, headers=self.headers, proxies=proxies, timeout=timeout)) def list_profile_by_profile_id(self, profile_id): return self.list_profiles({"filter[id]": profile_id, "include": ""}) def list_profile_by_profile_name(self, profile_name): return self.list_profiles({"filter[name]": profile_name, "include": ""}) class CertificatesAPI(object): # https://developer.apple.com/documentation/appstoreconnectapi/certificates def __init__(self, base_uri, jwt_headers): self.headers = jwt_headers self.certificates_url = '%s/certificates' % base_uri def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'): """ :param csr_content: :param certificate_type: :return: 201 CertificateResponse Created Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ json = { 'data': { 'type': 'certificates', 'attributes': { 'csrContent': csr_content, 'certificateType': certificate_type, # https://developer.apple.com/documentation/appstoreconnectapi/certificatetype } } } return request_format_log( requests.post(self.certificates_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) def download_certificate(self, certificate_id): # req.json()['data'][0]['attributes']['certificateContent'] # n=base64.b64decode(certificateContent) # with open('xxxxxx','wb') as f: # f.write(n) # print(n) pass def list_certificate(self, query_parameters=None): """ :param query_parameters: :return: 200 CertificatesResponse OK Content-Type: application/json 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json """ params = { "fields[certificates]": "certificateContent, certificateType, csrContent, displayName, expirationDate, " "name, platform, serialNumber", } if query_parameters: for k, v in query_parameters.items(): params[k] = v return request_format_log( requests.get(self.certificates_url, params=params, headers=self.headers, proxies=proxies, timeout=timeout)) def list_certificate_by_certificate_id(self, certificate_id): return self.list_certificate({"filter[id]": certificate_id, }) def revoke_certificate(self, certificate_id): """ :param certificate_id: :return: 204 No Content 400 ErrorResponse Bad Request An error occurred with your request. Content-Type: application/json 403 ErrorResponse Forbidden Request not authorized. Content-Type: application/json 404 ErrorResponse Not Found Resource not found. Content-Type: application/json 409 ErrorResponse Conflict The provided resource data is not valid. Content-Type: application/json """ base_url = '%s/%s' % (self.certificates_url, certificate_id) json = {} return request_format_log( requests.delete(base_url, json=json, headers=self.headers, proxies=proxies, timeout=timeout)) class BaseInfoObj(object): @staticmethod def filter(obj_lists, query_parameters=None): if not isinstance(obj_lists, list): obj_lists = [obj_lists] if query_parameters: new_obj_lists = [] for obj in obj_lists: flag = True for k, v in query_parameters.items(): if getattr(obj, k) != v: flag = False continue if flag: new_obj_lists.append(obj) return new_obj_lists return obj_lists @staticmethod def update(obj_lists, up_obj_list): conn_obj = [] conn_obj.extend(obj_lists) if not isinstance(up_obj_list, list): up_obj_list = [up_obj_list] conn_obj.extend(up_obj_list) repeat_id = [] repeat_obj = [] for i in range(len(conn_obj) - 1): for j in range(i + 1, len(conn_obj)): if conn_obj[i].id == conn_obj[j].id: repeat_obj.append(conn_obj[j]) repeat_id.append(conn_obj[i].id) new_list = [] for ob in conn_obj: if ob.id in repeat_id: continue new_list.append(ob) new_list.extend(repeat_obj) return new_list @staticmethod def delete(obj_lists, up_obj_list): new_obj_list = [] for obj in obj_lists: flag = True for up_obj in up_obj_list: if obj.id == up_obj.id: flag = False if flag: new_obj_list.append(obj) return new_obj_list class Devices(namedtuple("Devices", ["id", "addedDate", "name", "deviceClass", "model", "udid", "platform", "status"])): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_list.append(cls.from_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': json.get('id', '')} attributes = json.get("attributes", {}) for k, v in attributes.items(): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) class BundleIds(namedtuple("BundleIds", ["id", "name", "identifier", "platform", "seedId", ]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_list.append(cls.from_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': json.get('id', '')} attributes = json.get("attributes", {}) for k, v in attributes.items(): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) class Profiles(namedtuple("Profiles", ["id", "name", "profileState", "createdDate", "profileType", "profileContent", "uuid", "platform", "expirationDate"]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_list.append(cls.from_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': json.get('id', '')} attributes = json.get("attributes", {}) for k, v in attributes.items(): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) def download_profile(self, filepath): dirname = os.path.dirname(filepath) if os.path.isdir(dirname) and os.path.exists(dirname): pass else: os.makedirs(dirname) n = base64.b64decode(self.profileContent) with open(filepath, 'wb') as f: f.write(n) return filepath class Certificates(namedtuple("Certificates", ["id", "serialNumber", "certificateContent", "displayName", "name", "csrContent", "platform", "expirationDate", "certificateType"]), ): @classmethod def from_json_list(cls, json_list): new_cls_list = [] for json in json_list: new_cls_list.append(cls.from_json(json)) return new_cls_list @classmethod def from_json(cls, json): new_dict = {'id': json.get('id', '')} attributes = json.get("attributes", {}) for k, v in attributes.items(): new_dict[k] = v return cls(**new_dict) def copy_and_replace(self, **kwargs): return self._replace(**kwargs) def download_certificate(self, filepath): dirname = os.path.dirname(filepath) if os.path.isdir(dirname) and os.path.exists(dirname): pass else: os.makedirs(dirname) n = base64.b64decode(self.certificateContent) with open(filepath, 'wb') as f: f.write(n) return filepath def call_function_try_attempts(try_attempts=3, sleep_time=1): def decorator(func): @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() flag = False res = '' for i in range(try_attempts): try: res = func(*args, **kwargs) flag = True break except Exception as e: logger.warning( f'exec {func} failed. Failed:{e} {try_attempts} times in total. now {sleep_time} later try ' f'again...{i}') res = str(e) if 'Authentication credentials are missing or invalid' in str(e): res = '认证失败,请检查开发者信息填写是否正确' time.sleep(sleep_time) logger.info(f"exec {func} finished. time:{time.time() - start_time}") if not flag: logger.error(f'exec {func} failed after the maximum number of attempts. Failed:{res}') raise Exception(res) return res return wrapper return decorator class AppStoreConnectApi(DevicesAPI, BundleIDsAPI, BundleIDsCapabilityAPI, ProfilesAPI, CertificatesAPI): BASE_URI = 'https://api.appstoreconnect.apple.com/v1' JWT_AUD = 'appstoreconnect-v1' JWT_ALG = 'ES256' def __init__(self, issuer_id, private_key_id, p8_private_key, exp_seconds=1200): """ 根据 Apple 文档,会话最长持续时间为 20 分钟:https : //developer.apple.com/documentation/appstoreconnectapi/generating_tokens_for_api_requests :param issuer_id: :param private_key_id: :param p8_private_key: :param exp_seconds: max 20*60 """ self.issuer_id = issuer_id self.private_key_id = private_key_id self.p8_private_key = p8_private_key self.exp_seconds = exp_seconds self.__make_jwt_headers() DevicesAPI.__init__(self, self.BASE_URI, self.headers) BundleIDsAPI.__init__(self, self.BASE_URI, self.headers) BundleIDsCapabilityAPI.__init__(self, self.BASE_URI, self.headers) ProfilesAPI.__init__(self, self.BASE_URI, self.headers) CertificatesAPI.__init__(self, self.BASE_URI, self.headers) self.rate_limit_info = {} def __set_rate_limit_info(self, req_headers): for par in req_headers.get('X-Rate-Limit').split(";"): if par: limit_info_list = par.split(":") self.rate_limit_info[limit_info_list[0]] = limit_info_list[1] user_rem_info = self.rate_limit_info if int(user_rem_info.get('user-hour-rem')) < 3595: logger.warning(f"user-hour-rem over limit. so get jwt headers") self.__init__(self.issuer_id, self.private_key_id, self.p8_private_key) self.rate_limit_info = user_rem_info logger.info(f"rate_limit_info:{self.rate_limit_info}") def __make_jwt_headers(self): data = { "iss": self.issuer_id, "exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=self.exp_seconds), "aud": self.JWT_AUD } jwt_headers = { "alg": self.JWT_ALG, "kid": self.private_key_id, "typ": "JWT" } jwt_encoded = jwt.encode(data, self.p8_private_key, algorithm=self.JWT_ALG, headers=jwt_headers) headers = { 'Authorization': 'Bearer %s' % jwt_encoded } self.headers = headers def __base_format(self, s_type, req, success_code): # self.__set_rate_limit_info(req.headers) if req.status_code == success_code: req_data = req.json() data = req_data.get('data') if isinstance(data, list) or isinstance(data, dict): obj = None if isinstance(data, dict): data = [data] if s_type == 'devices': obj = Devices.from_json_list(data) elif s_type == 'bundleIds': obj = BundleIds.from_json_list(data) elif s_type == 'profiles': obj = Profiles.from_json_list(data) elif s_type == 'certificates': obj = Certificates.from_json_list(data) if len(obj) == 1: return obj[0] return obj else: # self.__init_jwt_headers() raise Exception(f'error instance: {req.text}') elif req.status_code == 401: # 授权问题 raise Exception(req.text) elif req.status_code == 429: # 请求超过每小时限制 {'user-hour-lim': '3600', 'user-hour-rem': '3586'} raise Exception(req.text) elif req.status_code == 500: time.sleep(60) raise Exception(req.text) else: raise Exception('unknown error: %s code:%s' % (req.text, req.status_code)) def __device_store(self, req, success_code=200): return self.__base_format('devices', req, success_code) def __profile_store(self, req, success_code=200): return self.__base_format('profiles', req, success_code) def __certificates_store(self, req, success_code=200): return self.__base_format('certificates', req, success_code) def __bundle_ids_store(self, req, success_code=200): return self.__base_format('bundleIds', req, success_code) @call_function_try_attempts() def get_all_devices(self): req = self.list_devices() return self.__device_store(req) def list_enabled_devices(self): req = super().list_enabled_devices() return self.__device_store(req) def get_all_bundle_ids(self): req = self.list_bundle_ids() return self.__bundle_ids_store(req) def get_all_profiles(self): req = self.list_profiles() return self.__profile_store(req) @call_function_try_attempts() def get_all_certificates(self): req = self.list_certificate() return self.__certificates_store(req) @call_function_try_attempts() def get_certificate_by_cid(self, certificate_id): req = self.list_certificate_by_certificate_id(certificate_id) return self.__certificates_store(req) def list_device_by_udid(self, udid): device_obj_list = BaseInfoObj.filter(self.get_all_devices(), {"udid": udid}) if not device_obj_list: raise Exception('Device obj is None') if len(device_obj_list) != 1 and len(set([device_obj.udid for device_obj in device_obj_list])) != 1: raise Exception('more than one Device obj') return device_obj_list @call_function_try_attempts() def register_device(self, device_name, device_udid, platform="IOS"): device_obj_list = BaseInfoObj.filter(self.get_all_devices(), {"udid": device_udid}) # 发现同一个开发者账户里面有两个一样的udid,奇了怪 if device_obj_list and ( len(device_obj_list) == 1 or len(set([device_obj.udid for device_obj in device_obj_list])) == 1): device_obj = device_obj_list[0] req = self.modify_registered_device(device_obj.id, device_name, 'ENABLED') return self.__device_store(req) else: req = super().register_device(device_name, device_udid, platform) return self.__device_store(req, 201) @call_function_try_attempts() def enabled_device(self, device_id, device_name, udid): if device_id and device_name: req = super().enabled_device(device_id, device_name) if req.status_code == 200: return self.__device_store(req) if udid: device_obj_list = self.list_device_by_udid(udid) for device_obj in device_obj_list: req = self.modify_registered_device(device_obj.id, device_obj.name, 'ENABLED') return self.__device_store(req) @call_function_try_attempts() def disabled_device(self, device_id, device_name, udid): if device_id and device_name: req = super().disabled_device(device_id, device_name) if req.status_code == 200: return self.__device_store(req) if udid: device_obj_list = self.list_device_by_udid(udid) for device_obj in device_obj_list: req = self.modify_registered_device(device_obj.id, device_obj.name, 'DISABLED') return self.__device_store(req) def list_bundle_ids_by_identifier(self, identifier): req = super().list_bundle_id_by_identifier(identifier) return self.__bundle_ids_store(req) def __do_success(self, req, status=200): if req.status_code == status: return True return False @call_function_try_attempts() def enable_capability_by_s_type(self, bundle_id, s_type): capability_list = get_capability(s_type) if capability_list: for capability in capability_list: req = super().enable_capability(bundle_id, capability) if self.__do_success(req, 201): logger.info(f"{bundle_id} enable_capability {capability} success") else: logger.warning(f"{bundle_id} enable_capability {capability} failed {req.content}") return True @call_function_try_attempts() def disable_capability_by_s_type(self, bundle_id, s_type=len(capability_info) - 1): capability_list = get_capability(s_type) if capability_list: for capability in capability_list: req = super().disable_capability(bundle_id, capability) if self.__do_success(req, 204): logger.info(f"{bundle_id} enable_capability {capability} success") else: logger.warning(f"{bundle_id} enable_capability {capability} failed {req.content}") return True def enable_push_vpn_capability(self, bundle_id): req = super().enable_capability(bundle_id, 'PUSH_NOTIFICATIONS') if self.__do_success(req, 201): req = super().enable_capability(bundle_id, 'PERSONAL_VPN') if self.__do_success(req, 201): return True return False def disable_push_vpn_capability(self, bundle_id): req = super().disable_capability(bundle_id, 'PUSH_NOTIFICATIONS') if self.__do_success(req, 204): req = super().disable_capability(bundle_id, 'PERSONAL_VPN') if self.__do_success(req, 204): return True return False @call_function_try_attempts() def register_bundle_id(self, bundle_id_name, bundle_id_identifier, platform="IOS", seed_id=''): identifier_obj = self.list_bundle_ids_by_identifier(bundle_id_identifier) if isinstance(identifier_obj, BundleIds): req = self.modify_bundle_id(identifier_obj.id, bundle_id_name) return self.__bundle_ids_store(req) else: req = super().register_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id) return self.__bundle_ids_store(req, 201) @call_function_try_attempts() def register_bundle_id_enable_capability(self, bundle_id_name, bundle_id_identifier, s_type, platform="IOS", seed_id=''): bundle_ids = self.register_bundle_id(bundle_id_name, bundle_id_identifier, platform, seed_id) if isinstance(bundle_ids, BundleIds): if self.enable_capability_by_s_type(bundle_ids.id, s_type): return bundle_ids @call_function_try_attempts() def delete_bundle_by_identifier(self, identifier_id, identifier_name): if identifier_id: req = self.delete_bundle_id_by_id(identifier_id) if req.status_code == 204: return True identifier_obj = self.list_bundle_ids_by_identifier(identifier_name) if isinstance(identifier_obj, BundleIds): req = self.delete_bundle_id_by_id(identifier_obj.id) if req.status_code == 204: return True @call_function_try_attempts() def create_profile(self, profile_id, bundle_id, certificate_id, profile_name, device_id_list=None, profile_type='IOS_APP_ADHOC'): if device_id_list is None: device_id_list = [] if not device_id_list: device_id_list = [device.id for device in self.list_enabled_devices()] self.delete_profile_by_id(profile_id, profile_name) # profile_obj = self.list_profile_by_profile_name(profile_name) # if isinstance(profile_obj, Profiles): # self.delete_profile_by_id(profile_obj.id, profile_name) req = super().create_profile(bundle_id, [certificate_id], profile_name, device_id_list) if req.status_code == 201: self.__profile_store(req, 201) return Profiles.from_json(req.json().get("data")) raise KeyError(req.text) def list_profile_by_profile_name(self, profile_name): req = super().list_profile_by_profile_name(profile_name) return self.__profile_store(req) @call_function_try_attempts() def delete_profile_by_id(self, profile_id, profile_name): if profile_id: req = super().delete_profile(profile_id) if self.__do_success(req, 204): return True profile_obj = self.list_profile_by_profile_name(profile_name) if profile_obj: req = super().delete_profile(profile_obj.id) if self.__do_success(req, 204): return True @call_function_try_attempts() def create_certificate(self, csr_content, certificate_type='IOS_DISTRIBUTION'): req = super().create_certificate(csr_content, certificate_type) if req.status_code == 201: return self.__certificates_store(req, 201) raise KeyError(req.text) @call_function_try_attempts() def revoke_certificate(self, certificate_id): req = super().revoke_certificate(certificate_id) if req.status_code == 204: return True return False