给你的公众号添加一个智能机器人
环境和工具
- 公众号一个
- 云服务器一台
- Python 环境
- Flask(Python 第三方库)
- 图灵机器人账号
以上就是我们实现公众号后台智能对话机器人需要的环境和工具,前两个就不多说了。我们代码使用 Python 开发,所以需要配置好 Python 开发环境,安装 Flash 库。最后需要注册一个图灵机器人账号,调用其 API 接口。
欢迎到我的公众号 码小白TM 里调戏机器人,去看看这个机器人智不智能!
Web 服务器搭建
我们想要接收公众号后台发送的消息,就需要搭建一个 Web 服务器了。我们在云服务器上安装好 Python 和 Flask 后,就可以进行 Web 服务器的搭建了。
我们首先搭建一个非常简单的 Web 服务器,就是在网页显示出 HelloWorld!,来跑通我们的代码。我们来看一下主程序代码。
main.py
from flask import Flask app = Flask(__name__)@app.route("/") def index(): return "Hello World!" if __name__ == "__main__": app.run(host='0.0.0.0')
代码非常简单,我们直接运行代码,启动 Web 服务器:
python main.py
运行成功后,我们可以在云服务器机器浏览器上访问 127.0.0.1,如果我们能看到 Hello World! 就说明我们服务器启动成功了。我们也可以在外网机器浏览器上访问你云服务器的外网 IP,来检测 Web 服务器是否成功启动。
公众号后台配置和验证
然后我们去公众号后台开发->基本配置页找到服务器配置,可以看到我们需要一个服务器的 URL 地址、Token 令牌、消息加解密密钥。
公众号后台配置
服务器的 URL 地址就是:http://服务器外网IP/wechat,/ 前面是你的云服务外网 IP 地址,后边是我们在代码里定义的路由入口(可以自己定义),后边我们会在代码中看到。Token 令牌是我们自己定义的,后边代码中也会用到。消息加解密密钥可以通过自动生成,消息加解密方式我们选择明文模式即可。
你第一次配置的时候下边会有保存按钮,这个时候我们先不要点击,因为我们需要在 Web 服务器端对消息字段进行解析处理,然后回传结果进行验证。
我们到微信公众号开发文档里看一下消息验证流程。
验证流程
根据上图流程我们进行我们的代码编写。
main.py
from flask import Flask from flask import request import hashlibapp = Flask(__name__)@app.route("/") def index(): return "Hello World!" # 公众号后台消息路由入口@app.route("/wechat", methods=["GET", "POST"]) def wechat(): # 验证使用的是GET方法 if request.method == "GET": signature = request.args.get('signature') timestamp = request.args.get('timestamp') nonce = request.args.get('nonce') echostr = request.args.get('echostr') token = "公众号后台填写的token" # 进行排序 dataList = [token, timestamp, nonce] dataList.sort() result = "".join(dataList) #哈希加密算法得到hashcode sha1 = hashlib.sha1() sha1.update(result.encode("utf-8")) hashcode = sha1.hexdigest() if hashcode == signature: return echostr else: return "" if __name__ == "__main__": app.run(host='0.0.0.0', port=80) #公众号后台只开放了80端口
这里需要注意 http 的端口号固定使用80,不可填写其他。这里坑了我很久,因为我的服务器上还有我自己的博客,80端口被博客占用了,为了把80端口让出来,我重新搭了我的博客,修改了端口,导致现在访问我的博客后边必须加上修改后的端口。这里如果有同学有更高的方法,还请劳烦告知一下!
好了,现在我们的验证流程完成,运行我们的 Web 服务器。然后到公众号后台配置处点击保存,如果提示验证通过,那么恭喜你完成了验证。
如果提示验证失败,我们自己根据报错提示来查看是哪里的问题。
实现“你问我答”和“图”上往来
我们验证成功了,下面就要开始处理粉丝在公众号发过来的消息了。我们先来实现一个简单的“你问我答”, 粉丝给公众号一条文本消息,公众号立马回复一条相同文本消息给粉丝;还有图上往来,接受粉丝发送的图片消息,并立马回复相同的图片给粉丝。
我们通过公众号文档知道,普通用户向公众号发消息是用的 POST 方法,消息分为文本消息、图片消息、语音消息、视频消息等很多种。这里我们只对文本消息和图片消息进行处理和回复(后续你也可以针对其他消息进行处理和回复)。
消息的格式为XML数据包,下面看一下文本消息的实例:
<xml> <ToUserName><![CDATA[toUser]]></ToUserName> <FromUserName><![CDATA[fromUser]]></FromUserName> <CreateTime>1348831860</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[this is a test]]></Content> <MsgId>1234567890123456</MsgId> </xml>
- ToUserName 开发者微信号
- FromUserName 发送方帐号(一个OpenID)
- CreateTime 消息创建时间 (整型)
- MsgType 消息类型,文本为text,图片为image
- Content 文本消息内容
- MsgId 消息id,64位整型
被动回复文本消息,就是我们回复给用户的文本消息类型为:
<xml> <ToUserName><![CDATA[粉丝号]]></ToUserName> <FromUserName><![CDATA[公众号]]></FromUserName> <CreateTime>1460541339</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[test]]></Content> </xml>
Content 回复的消息内容
回复的图片消息类型:
<xml> <ToUserName><![CDATA[toUser]]></ToUserName> <FromUserName><![CDATA[fromUser]]></FromUserName> <CreateTime>12345678</CreateTime> <MsgType><![CDATA[image]]></MsgType> <Image> <MediaId><![CDATA[media_id]]></MediaId> </Image> </xml>
MediaId 通过素材管理中的接口上传多媒体文件,得到的id。
接收和回复消息的格式我们知道了,下面我们来看一下整个流程。
我们根据流程来修改一下我们的代码。我们增加两个文件来处理接收的消息,和回复的消息 receive.py,reply.py。
receive.py
import xml.etree.ElementTree as ET def parse_xml(web_data): if len(web_data) == 0: return None xmlData = ET.fromstring(web_data) msg_type = xmlData.find('MsgType').text if msg_type == 'text': return TextMsg(xmlData) elif msg_type == 'image': return ImageMsg(xmlData) class Msg(object): def __init__(self, xmlData): self.ToUserName = xmlData.find('ToUserName').text self.FromUserName = xmlData.find('FromUserName').text self.CreateTime = xmlData.find('CreateTime').text self.MsgType = xmlData.find('MsgType').text self.MsgId = xmlData.find('MsgId').text class TextMsg(Msg): def __init__(self, xmlData): Msg.__init__(self, xmlData) self.Content = xmlData.find('Content').text class ImageMsg(Msg): def __init__(self, xmlData): Msg.__init__(self, xmlData) self.PicUrl = xmlData.find('PicUrl').text self.MediaId = xmlData.find('MediaId').text
reply.py
import time class Msg(object): def __init__(self): pass def send(self): return "success"class TextMsg(Msg): def __init__(self, toUserName, fromUserName, content): self.__dict = dict() self.__dict['ToUserName'] = toUserName self.__dict['FromUserName'] = fromUserName self.__dict['CreateTime'] = int(time.time()) self.__dict['Content'] = content def send(self): XmlForm = """ <xml> <ToUserName><![CDATA[{ToUserName}]]></ToUserName> <FromUserName><![CDATA[{FromUserName}]]></FromUserName> <CreateTime>{CreateTime}</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[{Content}]]></Content> </xml> """ return XmlForm.format(**self.__dict)class ImageMsg(Msg): def __init__(self, toUserName, fromUserName, mediaId): self.__dict = dict() self.__dict['ToUserName'] = toUserName self.__dict['FromUserName'] = fromUserName self.__dict['CreateTime'] = int(time.time()) self.__dict['MediaId'] = mediaId def send(self): XmlForm = """ <xml> <ToUserName><![CDATA[{ToUserName}]]></ToUserName> <FromUserName><![CDATA[{FromUserName}]]></FromUserName> <CreateTime>{CreateTime}</CreateTime> <MsgType><![CDATA[image]]></MsgType> <Image> <MediaId><![CDATA[{MediaId}]]></MediaId> </Image> </xml> """ return XmlForm.format(**self.__dict)
main.py
from flask import Flask from flask import request import hashlibimport receiveimport replyapp = Flask(__name__)@app.route("/") def index(): return "Hello World!" # 公众号后台消息路由入口@app.route("/wechat", methods=["GET", "POST"]) def wechat(): # 验证使用的GET方法 if request.method == "GET": signature = request.args.get('signature') timestamp = request.args.get('timestamp') nonce = request.args.get('nonce') echostr = request.args.get('echostr') token = "公众号后台填写的token" # 进行排序 dataList = [token, timestamp, nonce] dataList.sort() result = "".join(dataList) #哈希加密算法得到hashcode sha1 = hashlib.sha1() sha1.update(result.encode("utf-8")) hashcode = sha1.hexdigest() if hashcode == signature: return echostr else: return "" else: recMsg = receive.parse_xml(request.data) if isinstance(recMsg, receive.Msg): toUser = recMsg.FromUserName fromUser = recMsg.ToUserName if recMsg.MsgType == 'text': content = recMsg.Content replyMsg = reply.TextMsg(toUser, fromUser, content) return replyMsg.send() elif recMsg.MsgType == 'image': mediaId = recMsg.MediaId replyMsg = reply.ImageMsg(toUser, fromUser, mediaId) return replyMsg.send() else: return reply.Msg().send() else: return reply.Msg().send() if __name__ == "__main__": app.run(host='0.0.0.0', port=80) #公众号后台只开放了80端口
然后我们启动 Web 服务器,去公众号后台发消息发图片测试,如果成功的话立马就会回复你相同的文字和图片。
微信提供了一个在线测试的平台,可以很方便的进行开发中的各种测试。
接入图灵机器人
首先我们去图灵机器人官网注册一个账号。然后在后台创建一个机器人。
然后我们根据图灵机器人接入文档的使用说明:
- 编码方式:调用调用图灵API的各个环节的编码方式均为 UTF-8
- 接口地址:http://openapi.tuling123.com/openapi/api/v2
- 请求方式: HTTP POST
- 请求参数:参数格式为 json
请求参数示例:
{ "reqType":0, "perception": { "inputText": { "text": "附近的酒店" }, "inputImage": { "url": "imageUrl" }, "selfInfo": { "location": { "city": "北京", "province": "北京", "street": "信息路" } } }, "userInfo": { "apiKey": "", "userId": "" }}
输出参数示例:
{ "intent": { "code": 10005, "intentName": "", "actionName": "", "parameters": { "nearby_place": "酒店" } }, "results": [ { "groupType": 1, "resultType": "url", "values": { "url": "http://m.elong.com/hotel/0101/nlist/#indate=2016-12-10&outdate=2016-12-11&keywords=%E4%BF%A1%E6%81%AF%E8%B7%AF" } }, { "groupType": 1, "resultType": "text", "values": { "text": "亲,已帮你找到相关酒店信息" } } ]}
其中 apiKey 是可以在我们创建的机器人的参数中找到,userId 是用户唯一标识。
好了,下面来编写我们的代码。我们增加一个 tuling.py 文件来接入图灵接口。
tuling.py
import json import urllibapiKey = '从你创建的机器人获得' tulingUrl = "http://openapi.tuling123.com/openapi/api/v2" # content 是接收的消息,userId 是用户唯一标识def tulingReply(content, userId): requestData = { "reqType": 0, "perception": { "inputText": { "text": content }, "selfInfo": { "location": { "city": "北京" } } }, "userInfo": { "apiKey": apiKey, "userId": userId } } requestData = json.dumps(requestData).encode('utf8') http_post = urllib.request.Request( tulingUrl, data=requestData, headers={'content-type': 'application/json'}) response = urllib.request.urlopen(http_post) response_str = response.read().decode('utf8') response_dic = json.loads(response_str) results_code = response_dic['intent']['code'] # 免费版每天有固定次数,如果超过之后会返回4003错误码 if results_code == 4003: results_text = "4003:%s" % response_dic['results'][0]['values']['text'] else: results_text = response_dic['results'][0]['values']['text'] return results_text
修改 main.py
from flask import Flask from flask import request import hashlibimport reimport tulingimport receiveimport replyapp = Flask(__name__)@app.route("/") def index(): return "Hello World!" # 公众号后台消息路由入口@app.route("/wechat", methods=["GET", "POST"]) def wechat(): # 验证使用的GET方法 if request.method == "GET": signature = request.args.get('signature') timestamp = request.args.get('timestamp') nonce = request.args.get('nonce') echostr = request.args.get('echostr') token = "公众号后台填写的token" # 进行排序 dataList = [token, timestamp, nonce] dataList.sort() result = "".join(dataList) #哈希加密算法得到hashcode sha1 = hashlib.sha1() sha1.update(result.encode("utf-8")) hashcode = sha1.hexdigest() if hashcode == signature: return echostr else: return "" else: recMsg = receive.parse_xml(request.data) if isinstance(recMsg, receive.Msg): toUser = recMsg.FromUserName fromUser = recMsg.ToUserName if recMsg.MsgType == 'text': content = recMsg.Content # userId 长度小于等于32位 if len(toUser) > 31: userid = str(toUser[0:30]) else: userid = str(toUser) userid = re.sub(r'[^A-Za-z0-9]+', '', userid) tulingReplay = tuling.tulingReply(content, userid) replyMsg = reply.TextMsg(toUser, fromUser, tulingReplay) return replyMsg.send() elif recMsg.MsgType == 'image': mediaId = recMsg.MediaId replyMsg = reply.ImageMsg(toUser, fromUser, mediaId) return replyMsg.send() else: return reply.Msg().send() else: return reply.Msg().send() if __name__ == "__main__": app.run(host='0.0.0.0', port=80) #公众号后台只开放了80端口
耶,我们的机器人完成了,马上迫不及待的去试试。
可以愉快的和你的机器人对话了。
后面发现了第一关注公众号后的欢迎语没有了,因为你自己的服务器接管了公众号的消息,所以原来后台设置的欢迎语就失效了。在公众号文档中看到关注/取消关注属于事件。其消息的格式如下。