1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
| import json import time import traceback from asyncio import Queue
import requests from flask import jsonify from lark_oapi.api.im.v1 import *
from Common.ReturnInfo.ReturnInfo import ReturnInfo from ThlmObject.ThlmObject import WebParam
app_id = "****" app_secret = "*****"
class LarkService:
def __init__(self): self.auth = "" self.auth_time = 0
def deal(self, web_param: WebParam, return_info: ReturnInfo):
info = web_param.data queue: Queue = web_param.queue dict_memory: Dict = web_param.dict_memory logger = web_param.asserts.get("logger") ding = web_param.asserts.get("ding")
now = int(time.time())
if now - self.auth_time > 1.5 * 3600 or len(self.auth) == 0: try: _auth = self.get_auth() self.auth_time = int(time.time()) self.auth = _auth logger.info(f"auth 获得成功 {self.auth}") except Exception as e: logger.error(f"lark 获取 auth 失败") logger.error(traceback.format_exc())
try:
if isinstance(info, bytes): info = json.loads(info.decode("utf-8"))
challenge = info.get("challenge", None) print(info)
if challenge:
# webhook 最开始添加的时候会进行验证,这个 challenge 就是回复 webhook 验证的
# 提取challenge值 challenge = info['challenge'] # 构造响应 response = { "challenge": challenge } return jsonify(response) else: chat_type = info.get("event").get("message").get("chat_type") open_id = "" open_type = "" if chat_type == "p2p": # 私聊 open_type = "open_id" open_id = info.get("event").get("sender").get("sender_id").get("open_id") elif chat_type == "group": # 群组 open_type = "chat_id" open_id = info.get("event").get("message").get("chat_id") # message = json.loads(info.get("event").get("message").get("content")) message = json.loads(info.get("event").get("message").get("content")).get("text").replace("@_user_1", "") # 回复消息 parent_id = info.get("event").get("message").get("parent_id", None) if parent_id is not None: message = message + "\n" + self.get_message(parent_id) self.send(open_type, open_id, message)
return jsonify(return_info.common_success().to_dict()) except Exception as e: logger.error(f"lark service 处理错误") logger.error(traceback.format_exc()) return jsonify(return_info.common_fail().to_dict())
def get_auth(self): url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" headers = { "Content-Type": "application/json" } data = { "app_id": app_id, "app_secret": app_secret } response = requests.post(url, headers=headers, data=json.dumps(data)) return response.json().get("tenant_access_token")
def get_message(self, parent_id): url = f"https://open.larksuite.com/open-apis/im/v1/messages/{parent_id}" headers = { 'Authorization': f"Bearer {self.auth}", # your access token 'Content-Type': 'application/json' } response = requests.request("GET", url, headers=headers) info = response.json() return json.loads(info.get("data").get("items")[0].get("body").get("content")).get("text")
def send(self, open_type, open_id, message): url = "https://open.larksuite.com/open-apis/im/v1/messages" params = {"receive_id_type": open_type} info = { "text": message } req = { "receive_id": open_id, # chat id "msg_type": "text", "content": json.dumps(info), } payload = json.dumps(req) headers = { 'Authorization': f"Bearer {self.auth}", # your access token 'Content-Type': 'application/json' } response = requests.request("POST", url, params=params, headers=headers, data=payload)
# print(response.json())
if __name__ == '__main__': LarkService().send("chat_id", "********", "123")
|