跳到正文
W Winse Blog
ai automation 9 min read

钉钉群机器人对接ChatGPT

前面一段时刻,通过 https://github.com/zhayujie/bot-on-anything 对接过个人微信的群,但是没过多久微信就被警告不能扫码了,前几天更新了钉钉机器人的对接方法,尝试了一下感觉还行马马虎虎。

当然过程是揪心的:钉钉文档在更新升级,bot这边的文档太简单,需要对着代码,然后对着dingtalk官方的文档对照着去排查。

# 第一步 首先是创建企业内部机器人,获取Secret

平时对接的告警机器人是只能发消息的,不能接收消息。

在钉钉APP中创建一个组织,然后在以这个组织登录 https://open-dev.dingtalk.com/v1/fe/old#/corprobot 然后再点击 创建机器人:

其中基础信息中的AppSecret后面会用到,对应dingtalk_secret:

机器人基础信息

然后配置接受消息的地址:就是钉钉服务器收到消息后,把消息转发到你的服务器(公网可访问的)地址。服务器出口IP:就是你部署服务器的公网IP,为了验证信息的合法性的一个参数。

服务配置

创建好机器人后,在版本管理和发布页签中,上线机器人。这样我们在组织的群里面就能选择这个机器人了。

# 第二步 把机器人加入组织群,获取Token

选择 群设置 - 机器人 - 添加机器人,把刚刚创建的机器人加入到我们组织/公司群,然后查看机器人的设置,会提供一个webhook的地址,这一串地址中就包括了一个access_token的字符串参数,就是后面需要用的dingtalk_token。

机器人管理

机器人设置/配置页面

# 第三步 配置bot,启动服务

配置bot-on-anything的config.json,修改 channel - type 的类型为 dingtalk,dingtalk的参数为:机器人应用凭证的AppSecret对应dingtalk_secret;发布机器人,把机器人加入到群后,机器人设置页面的webhook地址上的access_token对应的是dingtalk_token。

"channel": {
    "type": "dingtalk",
    "single_chat_prefix": ["bot", "@bot"],
    "single_chat_reply_prefix": "[bot] ",
    "group_chat_prefix": ["@bot"],
    "group_name_white_list": ["ChatGPT测试群"],
    "image_create_prefix": ["画", "看", "找一张"],

    "wechat": {
    },

    "wechat_mp": {
      "token": "2023...",
      "port": "3000"
    },

    "dingtalk": {
      "image_create_prefix": ["画", "draw", "Draw"],
      "port": "3000",
      "dingtalk_token": "d55566a9e90...",
      "dingtalk_post_token": "",
      "dingtalk_secret": "PUMsK......"
    },
	

启动服务,愉快的耍起来。

# 第四步 小小改进

在测试图片消息的时刻,发现还不太支持,还有回复消息at提问的人也小小改进了一下:

cat dingtalk_channel.py 
# encoding:utf-8
import json
import hmac
import hashlib
import base64
import time
import requests
from urllib.parse import quote_plus
from common import log
from flask import Flask, request, render_template, make_response
from common import const
from common import functions
from config import channel_conf
from config import channel_conf_val
from channel.channel import Channel

class DingTalkChannel(Channel):
    def __init__(self):
        self.dingtalk_token = channel_conf(const.DINGTALK).get('dingtalk_token')
        self.dingtalk_post_token = channel_conf(const.DINGTALK).get('dingtalk_post_token')
        self.dingtalk_secret = channel_conf(const.DINGTALK).get('dingtalk_secret')
        log.info("[DingTalk] dingtalk_secret={}, dingtalk_token={} dingtalk_post_token={}".format(self.dingtalk_secret, self.dingtalk_token, self.dingtalk_post_token))

    def startup(self):
        http_app.run(host='0.0.0.0', port=channel_conf(const.DINGTALK).get('port'))
        
    def notify_dingtalk(self, data):
        timestamp = round(time.time() * 1000)
        secret_enc = bytes(self.dingtalk_secret, encoding='utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, self.dingtalk_secret)
        string_to_sign_enc = bytes(string_to_sign, encoding='utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc,
                             digestmod=hashlib.sha256).digest()
        sign = quote_plus(base64.b64encode(hmac_code))

        notify_url = f"https://oapi.dingtalk.com/robot/send?access_token={self.dingtalk_token}&timestamp={timestamp}&sign={sign}"
        try:
            r = requests.post(notify_url, json=data)
            reply = r.json()
            # log.info("[DingTalk] reply={}".format(str(reply)))
        except Exception as e:
            log.error(e)

    def handle(self, resp):
        reply = "您好,有什么我可以帮助您解答的问题吗?"
        prompt = resp['text']['content']
        prompt = prompt.strip()
        if str(prompt) != 0:
            conversation_id = resp['conversationId']
            sender_id = resp['senderId']
            context = dict()
            img_match_prefix = functions.check_prefix(
                prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
            if img_match_prefix:
                prompt = prompt.split(img_match_prefix, 1)[1].strip()
                context['type'] = 'IMAGE_CREATE'
            id = sender_id
            nick = resp['senderNick']
            staffid = resp['senderStaffId']
            context['from_user_id'] = str(id)
            reply = super().build_reply_content(prompt, context)
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                },
                "at": {
                    "atUserIds": [
                        staffid
                    ],
                    "isAtAll": False
                }
            }
        else:
            resp = {
                "msgtype": "text",
                "text": {
                    "content": reply
                },
                "at": {
                    "atUserIds": [
                       staffid 
                    ],
                    "isAtAll": False
                }
            }
        return resp 

dd = DingTalkChannel()
http_app = Flask(__name__,)

@http_app.route("/", methods=['POST'])
def chat():
    log.info("[DingTalk] chat_headers={}".format(str(request.headers)))
    log.info("[DingTalk] chat={}".format(str(request.data)))
    token = request.headers.get('token')
    if dd.dingtalk_post_token and token != dd.dingtalk_post_token:
        return {'ret': 203}
    data = json.loads(request.data)
    if data:
        content = data['text']['content']
        if not content:
            return
        reply = dd.handle(resp=data);
        dd.notify_dingtalk(reply)
        return {'ret': 200}
    return {'ret': 201}

# 后续

增加单聊,多机器人配置

    "dingtalk": {
      "image_create_prefix": ["画", "draw", "Draw"],
      "port": "3000",
      "dingtalk_token": "方式1",
      "dingtalk_post_token": "",
      "dingtalk_secret": "",
      "dingtalk_robots": ["方式2-key123", "方式2-group123"],
      "方式2-key123": {
          "dingtalk_key": "AppKey",
          "dingtalk_secret": "AppSecret",
          "dingtalk_token": "webhook-access-token",
          "dingtalk_post_token": ""
      },
      "方式2-group123": { 
          "dingtalk_group": "群名",
          "dingtalk_secret": "AppSecret",
          "dingtalk_token": "webhook-access-token",
          "dingtalk_post_token": ""
      }
    },

源代码

# encoding:utf-8
import json
import hmac
import hashlib
import base64
import time
import requests
from urllib.parse import quote_plus
from common import log
from flask import Flask, request, render_template, make_response
from common import const
from common import functions
from config import channel_conf
from config import channel_conf_val
from channel.channel import Channel

class DingTalkHandler():
    def __init__(self, config):
        self.dingtalk_key = config.get('dingtalk_key')
        self.dingtalk_secret = config.get('dingtalk_secret')
        self.dingtalk_token = config.get('dingtalk_token')
        self.dingtalk_post_token = config.get('dingtalk_post_token')
        self.access_token = None
        log.info("[DingTalk] AppKey={}, AppSecret={} Token={} post Token={}".format(self.dingtalk_key, self.dingtalk_secret, self.dingtalk_token, self.dingtalk_post_token))

    def notify_dingtalk_webhook(self, data):
        timestamp = round(time.time() * 1000)
        secret_enc = bytes(self.dingtalk_secret, encoding='utf-8')
        string_to_sign = '{}\n{}'.format(timestamp, self.dingtalk_secret)
        string_to_sign_enc = bytes(string_to_sign, encoding='utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc,
                             digestmod=hashlib.sha256).digest()
        sign = quote_plus(base64.b64encode(hmac_code))

        notify_url = f"https://oapi.dingtalk.com/robot/send?access_token={self.dingtalk_token}&timestamp={timestamp}&sign={sign}"
        try:
            log.info("[DingTalk] url={}".format(str(notify_url)))
            r = requests.post(notify_url, json=data)
            reply = r.json()
            log.info("[DingTalk] reply={}".format(str(reply)))
        except Exception as e:
            log.error(e)

    def get_token_internal(self):
        access_token_url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
        try:
            r = requests.post(access_token_url, json={"appKey": self.dingtalk_key, "appSecret": self.dingtalk_secret})
        except:
            raise Exception("DingTalk token获取失败!!!")

        data = json.loads(r.content)
        access_token = data['accessToken']
        expire_in = data['expireIn']
        
        self.access_token = access_token
        self.expire_at = int(expire_in) + time.time()

        return self.access_token
    
    def get_token(self):
        if self.access_token is None or self.expire_at <= time.time():
            self.get_token_internal()
        
        return self.access_token
    
    def get_post_url(self, data):
        type = data['conversationType']
        if type == "1":
            return f"https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend"
        else:
            return f"https://api.dingtalk.com/v1.0/robot/groupMessages/send"
    
    def build_response(self, reply, data):
        type = data['conversationType']
        if type == "1":
            return self.build_oto_response(reply, data)
        else:
            return self.build_group_response(reply, data)

    def build_oto_response(self, reply, data):
        conversation_id = data['conversationId']
        prompt = data['text']['content']
        prompt = prompt.strip()
        img_match_prefix = functions.check_prefix(
            prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
        nick = data['senderNick']
        staffid = data['senderStaffId']
        robotCode = data['robotCode']
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgKey": "sampleMarkdown",
                "msgParam": json.dumps({
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                }),
                "robotCode": robotCode,
                "userIds": [staffid]
            }
        else:
            resp = {
                "msgKey": "sampleText",
                "msgParam": json.dumps({
                    "content": reply
                }),
                "robotCode": robotCode,
                "userIds": [staffid]
            }
        return resp
    
    def build_group_response(self, reply, data):
        conversation_id = data['conversationId']
        prompt = data['text']['content']
        prompt = prompt.strip()
        img_match_prefix = functions.check_prefix(
            prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
        nick = data['senderNick']
        staffid = data['senderStaffId']
        robot_code = data['robotCode']
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgKey": "sampleMarkdown",
                "msgParam": json.dumps({
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                }),
                "robotCode": robot_code,
                "openConversationId": conversation_id,
                "at": {
                    "atUserIds": [
                        staffid
                    ],
                    "isAtAll": False
                }
            }
        else:
            resp = {
                "msgKey": "sampleText",
                "msgParam": json.dumps({
                    "content": reply + " \n " + "@" + nick
                }),
                "robotCode": robot_code,
                "openConversationId": conversation_id,
                "at": {
                    "atUserIds": [
                       staffid 
                    ],
                    "isAtAll": False
                }
            }
        return resp
    
    
    def build_webhook_response(self, reply, data):
        conversation_id = data['conversationId']
        prompt = data['text']['content']
        prompt = prompt.strip()
        img_match_prefix = functions.check_prefix(
            prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
        nick = data['senderNick']
        staffid = data['senderStaffId']
        robotCode = data['robotCode']
        if img_match_prefix and isinstance(reply, list):
            images = ""
            for url in reply:
                images += f"!['IMAGE_CREATE']({url})\n"
            reply = images
            resp = {
                "msgtype": "markdown",
                "markdown": {
                    "title": "IMAGE @" + nick + " ", 
                    "text": images + " \n " + "@" + nick
                },
                "at": {
                    "atUserIds": [
                        staffid
                    ],
                    "isAtAll": False
                }
            }
        else:
            resp = {
                "msgtype": "text",
                "text": {
                    "content": reply
                },
                "at": {
                    "atUserIds": [
                       staffid 
                    ],
                    "isAtAll": False
                }
            }
        return resp
    
    def chat(self, channel, data):
        reply = channel.handle(data)
        type = data['conversationType']
        if type == "1":
            reply_json = self.build_response(reply, data)
            self.notify_dingtalk(data, reply_json)
        else:
            # group的不清楚怎么@,先用webhook调用
            reply_json = self.build_webhook_response(reply, data)
            self.notify_dingtalk_webhook(reply_json)
        

    def notify_dingtalk(self, data, reply_json):
        headers = {
            'content-type': 'application/json', 
            'x-acs-dingtalk-access-token': self.get_token()
        }

        notify_url = self.get_post_url(data)
        try:
            r = requests.post(notify_url, json=reply_json, headers=headers)
            resp = r.json()
            log.info("[DingTalk] response={}".format(str(resp)))
        except Exception as e:
            log.error(e)

class DingTalkChannel(Channel):
    def __init__(self):
        log.info("[DingTalk] started.")

    def startup(self):
        http_app.run(host='0.0.0.0', port=channel_conf(const.DINGTALK).get('port'))

    def handle(self, data):
        reply = "您好,有什么我可以帮助您解答的问题吗?"
        prompt = data['text']['content']
        prompt = prompt.strip()
        if str(prompt) != 0:
            conversation_id = data['conversationId']
            sender_id = data['senderId']
            context = dict()
            img_match_prefix = functions.check_prefix(
                prompt, channel_conf_val(const.DINGTALK, 'image_create_prefix'))
            if img_match_prefix:
                prompt = prompt.split(img_match_prefix, 1)[1].strip()
                context['type'] = 'IMAGE_CREATE'
            id = sender_id
            context['from_user_id'] = str(id)
            reply = super().build_reply_content(prompt, context)
        return reply
         

dd = DingTalkChannel()
handlers = dict()
robots = channel_conf(const.DINGTALK).get('dingtalk_robots')
if robots and len(robots) > 0:
    for robot in robots:
        robot_config = channel_conf(const.DINGTALK).get(robot)
        robot_key = robot_config.get('dingtalk_key')
        group_name = robot_config.get('dingtalk_group')
        handlers[group_name or robot_key] = DingTalkHandler(robot_config)
else:
    handlers['DEFAULT'] = DingTalkHandler(channel_conf(const.DINGTALK))
http_app = Flask(__name__,)

@http_app.route("/", methods=['POST'])
def chat():
    log.info("[DingTalk] chat_headers={}".format(str(request.headers)))
    log.info("[DingTalk] chat={}".format(str(request.data)))
    token = request.headers.get('token')
    data = json.loads(request.data)
    if data:
        content = data['text']['content']
        if not content:
            return
        code = data['robotCode']
        group_name = None
        if 'conversationTitle' in data:
            group_name = data['conversationTitle']
        handler = handlers.get(group_name, handlers.get(code, handlers.get('DEFAULT')))
        if handler.dingtalk_post_token and token != handler.dingtalk_post_token:
            return {'ret': 203}
        handler.chat(dd, data)
        return {'ret': 200}
    
    return {'ret': 201}

–END

在 GitHub 上讨论

欢迎通过 GitHub Issue 留言或反馈。每条讨论都会关联到对应文章的源文件路径。

src/posts/2023-04-09-dingtalk-with-openai.md

Related posts