You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
4.2 KiB

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project: 9月
# author: NinEveN
# date: 2021/9/6
from hashlib import sha1
import requests
import logging
import json
from fir_ser.settings import THIRDLOGINCONF
from import WxMsgCryptBase
from import get_wx_access_token_cache
logger = logging.getLogger(__name__)
wx_login_info = THIRDLOGINCONF.wx_official
def create_menu():
menu_json = {
"button": [
"type": "click",
"name": "",
"key": "good"
"name": "分发平台",
"sub_button": [
"type": "view",
"name": "官方地址",
"url": ""
"type": "view",
"name": "留言反馈",
"url": ""
"type": "media_id",
"name": "联系我们",
"media_id": "qvQxPuAb4GnUgjkxl2xVnbsnldxawf4DXM09biqgP30"
p_url = f"{get_wx_access_token_cache()}"
req =, data=json.dumps(menu_json, ensure_ascii=False).encode('utf-8'))
def show_qrcode_url(ticket):
return f'{ticket}'
def make_wx_login_qrcode(scene_str='web.login', expire_seconds=600):
:param scene_str: 场景值ID(字符串形式的ID),字符串类型,长度限制为1到64
:param expire_seconds: 该二维码有效时间,以秒为单位。 最大不超过2592000(即30天),此字段如果不填,则默认有效期为30秒。
:return: {
t_url = f'{get_wx_access_token_cache()}'
data = {"expire_seconds": expire_seconds, "action_name": "QR_STR_SCENE",
"action_info": {"scene": {"scene_str": scene_str}}}
req =, json=data)
if req.status_code == 200:
return True, req.json()
logger.error(f"make wx login qrcode failed {req.status_code} {req.text}")
return False, req.text
def get_userinfo_from_openid(open_id):
t_url = f'{get_wx_access_token_cache()}&openid={open_id}&lang=zh_CN'
req = requests.get(t_url)
if req.status_code == 200:
return True, req.json()
logger.error(f"get userinfo from openid failed {req.status_code} {req.text}")
return False, req.text
class WxOfficialBase(object):
def __init__(self, app_id, app_secret, token, encoding_aes_key):
self.app_id = app_id
self.app_secret = app_secret
self.token = token
self.encoding_aes_key = encoding_aes_key
def get_access_token(self):
t_url = f'{self.app_id}&secret={self.app_secret}'
req = requests.get(t_url)
if req.status_code == 200:
return req.json()
logger.error(f"get access token failed {req.status_code} {req.text}")
return req.text
def make_wx_auth_obj():
return WxOfficialBase(**wx_login_info.get('auth'))
def check_signature(params):
tmp_list = sorted([wx_login_info.get('auth', {}).get('token'), params.get("timestamp"), params.get("nonce")])
tmp_str = "".join(tmp_list)
tmp_str = sha1(tmp_str.encode("utf-8")).hexdigest()
if tmp_str == params.get("signature"):
return int(params.get("echostr"))
return ''
class WxMsgCrypt(WxMsgCryptBase):
def __init__(self):