Winse Blog

走走停停都是风景, 熙熙攘攘都向最好, 忙忙碌碌都为明朝, 何畏之.

钉钉群机器人对接ChatGPT

前面一段时刻,通过 https://github.com/zhayujie/bot-on-anything 对接过个人微信的群,但是没过多久微信就被警告不能扫码了,前几天更新了钉钉机器人的对接方法,尝试了一下感觉还行马马虎虎。

当然过程是揪心的:钉钉文档在更新升级,bot这边的文档太简单,需要对着代码,然后对着dingtalk官方的文档对照着去排查。

第一步 首先是创建企业内部机器人,获取Secret

平时对接的告警机器人是只能发消息的,不能接收消息。

在钉钉APP中创建一个组织,然后在以这个组织登录 https://open-dev.dingtalk.com/v1/fe/old#/corprobot 然后再点击 创建机器人:

其中基础信息中的AppSecret后面会用到,对应dingtalk_secret:

机器人基础信息

然后配置接受消息的地址:就是钉钉服务器收到消息后,把消息转发到你的服务器(公网可访问的)地址。服务器出口IP:就是你部署服务器的公网IP,为了验证信息的合法性的一个参数。

服务配置

创建好机器人后,在版本管理和发布页签中,上线机器人。这样我们在组织的群里面就能选择这个机器人了。

第二步 把机器人加入组织群,获取Token

选择 群设置 - 机器人 - 添加机器人,把刚刚创建的机器人加入到我们组织/公司群,然后查看机器人的设置,会提供一个webhook的地址,这一串地址中就包括了一个access_token的字符串参数,就是后面需要用的dingtalk_token。

机器人管理

机器人设置/配置页面

第三步 配置bot,启动服务

配置bot-on-anything的config.json,修改 channel - type 的类型为 dingtalk,dingtalk的参数为:机器人应用凭证的AppSecret对应dingtalk_secret;发布机器人,把机器人加入到群后,机器人设置页面的webhook地址上的access_token对应的是dingtalk_token。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
"channel": {
    "type": "dingtalk",
    "single_chat_prefix": ["bot", "@bot"],
    "single_chat_reply_prefix": "[bot] ",
    "group_chat_prefix": ["@bot"],
    "group_name_white_list": ["ChatGPT测试群"],
    "image_create_prefix": ["画", "看", "找一张"],

    "wechat": {
    },

    "wechat_mp": {
      "token": "2023...",
      "port": "3000"
    },

    "dingtalk": {
      "image_create_prefix": ["画", "draw", "Draw"],
      "port": "3000",
      "dingtalk_token": "d55566a9e90...",
      "dingtalk_post_token": "",
      "dingtalk_secret": "PUMsK......"
    },
  

启动服务,愉快的耍起来。

第四步 小小改进

在测试图片消息的时刻,发现还不太支持,还有回复消息at提问的人也小小改进了一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
cat dingtalk_channel.py 
# encoding:utf-8
import json
import hmac
import hashlib
import base64
import time
import requests
from urllib.parse import quote_plus
from common import log
from flask import Flask, request, render_template, make_response
from common import const
from common import functions
from config import channel_conf
from config import channel_conf_val
from channel.channel import Channel

class DingTalkChannel(Channel):
    def __init__(self):
        self.dingtalk_token = channel_conf(const.DINGTALK).get('dingtalk_token')
        self.dingtalk_post_token = channel_conf(const.DINGTALK).get('dingtalk_post_token')
        self.dingtalk_secret = channel_conf(const.DINGTALK).get('dingtalk_secret')
        log.info("[DingTalk] dingtalk_secret={}, dingtalk_token={} dingtalk_post_token={}".format(self.dingtalk_secret, self.dingtalk_token, self.dingtalk_post_token))

    def startup(self):
        http_app.run(host='0.0.0.0', port=channel_conf(const.DINGTALK).get('port'))
        
    def notify_dingtalk(self, data):
        timestamp = round(time.time() * 1000)
        secret_enc = bytes(self.dingtalk_secret, encoding='utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, self.dingtalk_secret)
        string_to_sign_enc = bytes(string_to_sign, encoding='utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc,
                             digestmod=hashlib.sha256).digest()
        sign = quote_plus(base64.b64encode(hmac_code))

        notify_url = f"https://oapi.dingtalk.com/robot/send?access_token={self.dingtalk_token}×tamp={timestamp}&sign={sign}"
        try:
            r = requests.post(notify_url, json=data)
            reply = r.json()
            # log.info("[DingTalk] reply={}".format(str(reply)))
        except Exception as e:
            log.error(e)

    def handle(self, resp):
        reply = "您好,有什么我可以帮助您解答的问题吗?"
        prompt = resp['text']['content']
        prompt = prompt.strip()
        if str(prompt) != 0:
            conversation_id = resp['conversationId']
            sender_id = resp['senderId']
            context = dict()
            img_match_prefix = functions.check_prefix(
                prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
            if img_match_prefix:
                prompt = prompt.split(img_match_prefix, 1)[1].strip()
                context['type'] = 'IMAGE_CREATE'
            id = sender_id
            nick = resp['senderNick']
            staffid = resp['senderStaffId']
            context['from_user_id'] = str(id)
            reply = super().build_reply_content(prompt, context)
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                },
                "at": {
                    "atUserIds": [
                        staffid
                    ],
                    "isAtAll": False
                }
            }
        else:
            resp = {
                "msgtype": "text",
                "text": {
                    "content": reply
                },
                "at": {
                    "atUserIds": [
                       staffid 
                    ],
                    "isAtAll": False
                }
            }
        return resp 


dd = DingTalkChannel()
http_app = Flask(__name__,)


@http_app.route("/", methods=['POST'])
def chat():
    log.info("[DingTalk] chat_headers={}".format(str(request.headers)))
    log.info("[DingTalk] chat={}".format(str(request.data)))
    token = request.headers.get('token')
    if dd.dingtalk_post_token and token != dd.dingtalk_post_token:
        return {'ret': 203}
    data = json.loads(request.data)
    if data:
        content = data['text']['content']
        if not content:
            return
        reply = dd.handle(resp=data);
        dd.notify_dingtalk(reply)
        return {'ret': 200}
    return {'ret': 201}

后续

增加单聊,多机器人配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
"dingtalk": {
  "image_create_prefix": ["画", "draw", "Draw"],
  "port": "3000",
  "dingtalk_token": "方式1",
  "dingtalk_post_token": "",
  "dingtalk_secret": "",
  "dingtalk_robots": ["方式2-key123", "方式2-group123"],
  "方式2-key123": {
      "dingtalk_key": "AppKey",
      "dingtalk_secret": "AppSecret",
      "dingtalk_token": "webhook-access-token",
      "dingtalk_post_token": ""
  },
  "方式2-group123": { 
      "dingtalk_group": "群名",
      "dingtalk_secret": "AppSecret",
      "dingtalk_token": "webhook-access-token",
      "dingtalk_post_token": ""
  }
},

源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# encoding:utf-8
import json
import hmac
import hashlib
import base64
import time
import requests
from urllib.parse import quote_plus
from common import log
from flask import Flask, request, render_template, make_response
from common import const
from common import functions
from config import channel_conf
from config import channel_conf_val
from channel.channel import Channel


class DingTalkHandler():
    def __init__(self, config):
        self.dingtalk_key = config.get('dingtalk_key')
        self.dingtalk_secret = config.get('dingtalk_secret')
        self.dingtalk_token = config.get('dingtalk_token')
        self.dingtalk_post_token = config.get('dingtalk_post_token')
        self.access_token = None
        log.info("[DingTalk] AppKey={}, AppSecret={} Token={} post Token={}".format(self.dingtalk_key, self.dingtalk_secret, self.dingtalk_token, self.dingtalk_post_token))

    def notify_dingtalk_webhook(self, data):
        timestamp = round(time.time() * 1000)
        secret_enc = bytes(self.dingtalk_secret, encoding='utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, self.dingtalk_secret)
        string_to_sign_enc = bytes(string_to_sign, encoding='utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc,
                             digestmod=hashlib.sha256).digest()
        sign = quote_plus(base64.b64encode(hmac_code))

        notify_url = f"https://oapi.dingtalk.com/robot/send?access_token={self.dingtalk_token}×tamp={timestamp}&sign={sign}"
        try:
            log.info("[DingTalk] url={}".format(str(notify_url)))
            r = requests.post(notify_url, json=data)
            reply = r.json()
            log.info("[DingTalk] reply={}".format(str(reply)))
        except Exception as e:
            log.error(e)

    def get_token_internal(self):
        access_token_url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
        try:
            r = requests.post(access_token_url, json={"appKey": self.dingtalk_key, "appSecret": self.dingtalk_secret})
        except:
            raise Exception("DingTalk token获取失败!!!")

        data = json.loads(r.content)
        access_token = data['accessToken']
        expire_in = data['expireIn']
        
        self.access_token = access_token
        self.expire_at = int(expire_in) + time.time()

        return self.access_token
    
    def get_token(self):
        if self.access_token is None or self.expire_at <= time.time():
            self.get_token_internal()
        
        return self.access_token
    
    def get_post_url(self, data):
        type = data['conversationType']
        if type == "1":
            return f"https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
        else:
            return f"https://api.dingtalk.com/v1.0/robot/groupMessages/send"
    
    def build_response(self, reply, data):
        type = data['conversationType']
        if type == "1":
            return self.build_oto_response(reply, data)
        else:
            return self.build_group_response(reply, data)

    def build_oto_response(self, reply, data):
        conversation_id = data['conversationId']
        prompt = data['text']['content']
        prompt = prompt.strip()
        img_match_prefix = functions.check_prefix(
            prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
        nick = data['senderNick']
        staffid = data['senderStaffId']
        robotCode = data['robotCode']
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgKey": "sampleMarkdown",
                "msgParam": json.dumps({
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                }),
                "robotCode": robotCode,
                "userIds": [staffid]
            }
        else:
            resp = {
                "msgKey": "sampleText",
                "msgParam": json.dumps({
                    "content": reply
                }),
                "robotCode": robotCode,
                "userIds": [staffid]
            }
        return resp
    
    def build_group_response(self, reply, data):
        conversation_id = data['conversationId']
        prompt = data['text']['content']
        prompt = prompt.strip()
        img_match_prefix = functions.check_prefix(
            prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
        nick = data['senderNick']
        staffid = data['senderStaffId']
        robot_code = data['robotCode']
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgKey": "sampleMarkdown",
                "msgParam": json.dumps({
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                }),
                "robotCode": robot_code,
                "openConversationId": conversation_id,
                "at": {
                    "atUserIds": [
                        staffid
                    ],
                    "isAtAll": False
                }
            }
        else:
            resp = {
                "msgKey": "sampleText",
                "msgParam": json.dumps({
                    "content": reply + " \n " + "@" + nick
                }),
                "robotCode": robot_code,
                "openConversationId": conversation_id,
                "at": {
                    "atUserIds": [
                       staffid 
                    ],
                    "isAtAll": False
                }
            }
        return resp
    
    
    def build_webhook_response(self, reply, data):
        conversation_id = data['conversationId']
        prompt = data['text']['content']
        prompt = prompt.strip()
        img_match_prefix = functions.check_prefix(
            prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
        nick = data['senderNick']
        staffid = data['senderStaffId']
        robotCode = data['robotCode']
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                },
                "at": {
                    "atUserIds": [
                        staffid
                    ],
                    "isAtAll": False
                }
            }
        else:
            resp = {
                "msgtype": "text",
                "text": {
                    "content": reply
                },
                "at": {
                    "atUserIds": [
                       staffid 
                    ],
                    "isAtAll": False
                }
            }
        return resp
    
    def chat(self, channel, data):
        reply = channel.handle(data)
        type = data['conversationType']
        if type == "1":
            reply_json = self.build_response(reply, data)
            self.notify_dingtalk(data, reply_json)
        else:
            # group的不清楚怎么@,先用webhook调用
            reply_json = self.build_webhook_response(reply, data)
            self.notify_dingtalk_webhook(reply_json)
        

    def notify_dingtalk(self, data, reply_json):
        headers = {
            'content-type': 'application/json', 
            'x-acs-dingtalk-access-token': self.get_token()
        }

        notify_url = self.get_post_url(data)
        try:
            r = requests.post(notify_url, json=reply_json, headers=headers)
            resp = r.json()
            log.info("[DingTalk] response={}".format(str(resp)))
        except Exception as e:
            log.error(e)


class DingTalkChannel(Channel):
    def __init__(self):
        log.info("[DingTalk] started.")

    def startup(self):
        http_app.run(host='0.0.0.0', port=channel_conf(const.DINGTALK).get('port'))

    def handle(self, data):
        reply = "您好,有什么我可以帮助您解答的问题吗?"
        prompt = data['text']['content']
        prompt = prompt.strip()
        if str(prompt) != 0:
            conversation_id = data['conversationId']
            sender_id = data['senderId']
            context = dict()
            img_match_prefix = functions.check_prefix(
                prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
            if img_match_prefix:
                prompt = prompt.split(img_match_prefix, 1)[1].strip()
                context['type'] = 'IMAGE_CREATE'
            id = sender_id
            context['from_user_id'] = str(id)
            reply = super().build_reply_content(prompt, context)
        return reply
         

dd = DingTalkChannel()
handlers = dict()
robots = channel_conf(const.DINGTALK).get('dingtalk_robots')
if robots and len(robots) > 0:
    for robot in robots:
        robot_config = channel_conf(const.DINGTALK).get(robot)
        robot_key = robot_config.get('dingtalk_key')
        group_name = robot_config.get('dingtalk_group')
        handlers[group_name or robot_key] = DingTalkHandler(robot_config)
else:
    handlers['DEFAULT'] = DingTalkHandler(channel_conf(const.DINGTALK))
http_app = Flask(__name__,)


@http_app.route("/", methods=['POST'])
def chat():
    log.info("[DingTalk] chat_headers={}".format(str(request.headers)))
    log.info("[DingTalk] chat={}".format(str(request.data)))
    token = request.headers.get('token')
    data = json.loads(request.data)
    if data:
        content = data['text']['content']
        if not content:
            return
        code = data['robotCode']
        group_name = None
        if 'conversationTitle' in data:
            group_name = data['conversationTitle']
        handler = handlers.get(group_name, handlers.get(code, handlers.get('DEFAULT')))
        if handler.dingtalk_post_token and token != handler.dingtalk_post_token:
            return {'ret': 203}
        handler.chat(dd, data)
        return {'ret': 200}
    
    return {'ret': 201}


–END

Comments