0%

lark | lark 机器人

本次的 lark 并不使用官方提供的 SDK,而是用最基本的 requests 请求,进行调用。

创建应用

点击创建应用进入网页,然后选择

在下面填写应用信息

添加机器人功能

这样一个机器人应用就创建好了。

机器人如何运行

首先 lark 的机器人是由权限构成,比如,你要有私聊人的能力、获取人聊天记录的能力、@别人的能力,你必须要开启相关的权限。

lark 机器人有两种情况

  • 信息发送出去
  • 信息发送过来

发送出去很好理解,就是你给别人发信息,在群里发信息。

信息发送过来,指的是别人主动给机器人发信息,别人在群里主动 @ 机器人。信息发送过来叫做事件。你的机器人只有绑定事件才能得到相应的信息。怎么理解呢?

你需要弄一个 webhook 服务,然后绑定一些事件,比如 @事件、私聊事件等。

ps: webhook 添加完成后,lark 会推送信息进行验证,下面的代码有体现。

一旦有事件发生,lark 就会把相关的信息通过配置的 webhook 推送给你。

这就是整个机器人的数据流向。

机器人并不是开发完就能使用了,而是必须发版本,这个发版本的意思并不是让你把代码上传,而是对权限和事件进行发版本。

比如,你开启了 @别人的权限,那么,你必须让你机器人进行版本升级,这个版本通过公司创始人审核后,才能用,换句话说,所谓的发版本就是让人审核而已。

编写机器人代码

获取 api 和 secret。

lark 的认证 auth ,每隔 2 小时会更新一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import json
import time
import traceback
from asyncio import Queue

import requests
from flask import jsonify
from lark_oapi.api.im.v1 import *

from Common.ReturnInfo.ReturnInfo import ReturnInfo
from ThlmObject.ThlmObject import WebParam

app_id = "****"
app_secret = "*****"


class LarkService:

def __init__(self):
self.auth = ""
self.auth_time = 0

def deal(self, web_param: WebParam, return_info: ReturnInfo):

info = web_param.data
queue: Queue = web_param.queue
dict_memory: Dict = web_param.dict_memory
logger = web_param.asserts.get("logger")
ding = web_param.asserts.get("ding")

now = int(time.time())

if now - self.auth_time > 1.5 * 3600 or len(self.auth) == 0:
try:
_auth = self.get_auth()
self.auth_time = int(time.time())
self.auth = _auth
logger.info(f"auth 获得成功 {self.auth}")
except Exception as e:
logger.error(f"lark 获取 auth 失败")
logger.error(traceback.format_exc())

try:

if isinstance(info, bytes):
info = json.loads(info.decode("utf-8"))

challenge = info.get("challenge", None)
print(info)

if challenge:

# webhook 最开始添加的时候会进行验证,这个 challenge 就是回复 webhook 验证的

# 提取challenge值
challenge = info['challenge']
# 构造响应
response = {
"challenge": challenge
}
return jsonify(response)
else:
chat_type = info.get("event").get("message").get("chat_type")
open_id = ""
open_type = ""
if chat_type == "p2p":
# 私聊
open_type = "open_id"
open_id = info.get("event").get("sender").get("sender_id").get("open_id")
elif chat_type == "group":
# 群组
open_type = "chat_id"
open_id = info.get("event").get("message").get("chat_id")
# message = json.loads(info.get("event").get("message").get("content"))
message = json.loads(info.get("event").get("message").get("content")).get("text").replace("@_user_1",
"")
# 回复消息
parent_id = info.get("event").get("message").get("parent_id", None)
if parent_id is not None:
message = message + "\n" + self.get_message(parent_id)
self.send(open_type, open_id, message)

return jsonify(return_info.common_success().to_dict())
except Exception as e:
logger.error(f"lark service 处理错误")
logger.error(traceback.format_exc())
return jsonify(return_info.common_fail().to_dict())

def get_auth(self):
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {
"Content-Type": "application/json"
}
data = {
"app_id": app_id,
"app_secret": app_secret
}
response = requests.post(url, headers=headers, data=json.dumps(data))
return response.json().get("tenant_access_token")

def get_message(self, parent_id):
url = f"https://open.larksuite.com/open-apis/im/v1/messages/{parent_id}"
headers = {
'Authorization': f"Bearer {self.auth}", # your access token
'Content-Type': 'application/json'
}
response = requests.request("GET", url, headers=headers)
info = response.json()
return json.loads(info.get("data").get("items")[0].get("body").get("content")).get("text")

def send(self, open_type, open_id, message):
url = "https://open.larksuite.com/open-apis/im/v1/messages"
params = {"receive_id_type": open_type}
info = {
"text": message
}
req = {
"receive_id": open_id, # chat id
"msg_type": "text",
"content": json.dumps(info),
}
payload = json.dumps(req)
headers = {
'Authorization': f"Bearer {self.auth}", # your access token
'Content-Type': 'application/json'
}
response = requests.request("POST", url, params=params, headers=headers, data=payload)

# print(response.json())


if __name__ == '__main__':
LarkService().send("chat_id", "********", "123")

上面的代码分别实现了

  • 私聊
  • 回复消息
  • webhook 验证
请我喝杯咖啡吧~