Python微信公众号开发平台
上大学的时候,对微信公众号开发浅尝辄止的玩了一下,感觉还是挺有意思的。
//www.jb51.net/article/133677.htm后来服务器到期了,也就搁置了。由于发布web程序,使用PHP很顺手,就使用了PHP作为开发语言。但是其实微信公众号的开发和语言关联并不大,流程,原理上都是一致的。
快要做毕设了,想着到时候应该会部署一些代码到服务器上,进行长期的系统构建。所以趁着还是学生,就买了阿里云的学生机。买了之后,就想着玩点什么,于是微信公众号的开发,就又提上了日程。但是这次,我不打算使用PHP了,感觉局限性相对于Python而言,稍微有点大。
使用Python的话,可以灵活的部署一些爬虫类程序,和用户交互起来也会比较方便。可拓展性感觉也比较的高,于是就选它了。
服务器配置这部分属于是比较基础的,不太明白的可以看看我之前的那个博客,还算是比较的详细。今天就只是对核心代码做下介绍好了。
项目目录
root@aliyun:/var/www/html/wx/py# ls *.py api.py dispatcher.py robot.py root@aliyun:/var/www/html/wx/py#
api.py
这个文件相当于是一个关卡,涉及token的验证,和服务的支持。
# -*- coding:utf-8 -*- #中文编码 import sys reload(sys) # 不加这部分处理中文还是会出问题 sys.setdefaultencoding('utf-8') import time from flask import Flask, request, make_response import hashlib import json import xml.etree.ElementTree as ET from dispatcher import * app = Flask(__name__) app.debug = True @app.route('/') # 默认网址 def index(): return 'Index Page' @app.route('/wx', methods=['GET', 'POST']) def wechat_auth(): # 处理微信请求的处理函数,get方法用于认证,post方法取得微信转发的数据 if request.method == 'GET': token = '你自己设置好的token' data = request.args signature = data.get('signature', '') timestamp = data.get('timestamp', '') nonce = data.get('nonce', '') echostr = data.get('echostr', '') s = [timestamp, nonce, token] s.sort() s = ''.join(s) if (hashlib.sha1(s).hexdigest() == signature): return make_response(echostr) else: rec = request.stream.read() # 接收消息 dispatcher = MsgDispatcher(rec) data = dispatcher.dispatch() with open("./debug.log", "a") as file: file.write(data) file.close() response = make_response(data) response.content_type = 'application/xml' return response if __name__ == '__main__': app.run(host="0.0.0.0", port=80)
dispatcher.py
这个文件是整个服务的核心,用于识别用户发来的消息类型,然后交给不同的handler来处理,并将运行的结果反馈给前台,发送给用户。消息类型这块,在微信的开发文档上有详细的介绍,因此这里就不再过多的赘述了。
#! /usr/bin python # coding: utf8 import sys reload(sys) sys.setdefaultencoding("utf8") import time import json import xml.etree.ElementTree as ET from robot import * class MsgParser(object): """ 用于解析从微信公众平台传递过来的参数,并进行解析 """ def __init__(self, data): self.data = data def parse(self): self.et = ET.fromstring(self.data) self.user = self.et.find("FromUserName").text self.master = self.et.find("ToUserName").text self.msgtype = self.et.find("MsgType").text # 纯文字信息字段 self.content = self.et.find("Content").text if self.et.find("Content") is not None else "" # 语音信息字段 self.recognition = self.et.find("Recognition").text if self.et.find("Recognition") is not None else "" self.format = self.et.find("Format").text if self.et.find("Format") is not None else "" self.msgid = self.et.find("MsgId").text if self.et.find("MsgId") is not None else "" # 图片 self.picurl = self.et.find("PicUrl").text if self.et.find("PicUrl") is not None else "" self.mediaid = self.et.find("MediaId").text if self.et.find("MediaId") is not None else "" # 事件 self.event = self.et.find("Event").text if self.et.find("Event") is not None else "" return self class MsgDispatcher(object): """ 根据消息的类型,获取不同的处理返回值 """ def __init__(self, data): parser = MsgParser(data).parse() self.msg = parser self.handler = MsgHandler(parser) def dispatch(self): self.result = "" # 统一的公众号出口数据 if self.msg.msgtype == "text": self.result = self.handler.textHandle() elif self.msg.msgtype == "voice": self.result = self.handler.voiceHandle() elif self.msg.msgtype == 'image': self.result = self.handler.imageHandle() elif self.msg.msgtype == 'video': self.result = self.handler.videoHandle() elif self.msg.msgtype == 'shortvideo': self.result = self.handler.shortVideoHandle() elif self.msg.msgtype == 'location': self.result = self.handler.locationHandle() elif self.msg.msgtype == 'link': self.result = self.handler.linkHandle() elif self.msg.msgtype == 'event': self.result = self.handler.eventHandle() return self.result class MsgHandler(object): """ 针对type不同,转交给不同的处理函数。直接处理即可 """ def __init__(self, msg): self.msg = msg self.time = int(time.time()) def textHandle(self, user='', master='', time='', content=''): template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[text]]></MsgType> <Content><![CDATA[{}]]></Content> </xml> """ # 对用户发过来的数据进行解析,并执行不同的路径 try: response = get_response_by_keyword(self.msg.content) if response['type'] == "image": result = self.imageHandle(self.msg.user, self.msg.master, self.time, response['content']) elif response['type'] == "music": data = response['content'] result = self.musicHandle(data['title'], data['description'], data['url'], data['hqurl']) elif response['type'] == "news": items = response['content'] result = self.newsHandle(items) # 这里还可以添加更多的拓展内容 else: response = get_turing_response(self.msg.content) result = template.format(self.msg.user, self.msg.master, self.time, response) #with open("./debug.log", 'a') as f: # f.write(response['content'] + '~~' + result) # f.close() except Exception as e: with open("./debug.log", 'a') as f: f.write("text handler:"+str(e.message)) f.close() return result def musicHandle(self, title='', description='', url='', hqurl=''): template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[music]]></MsgType> <Music> <Title><![CDATA[{}]]></Title> <Description><![CDATA[{}]]></Description> <MusicUrl><![CDATA[{}]]></MusicUrl> <HQMusicUrl><![CDATA[{}]]></HQMusicUrl> </Music> <FuncFlag>0</FuncFlag> </xml> """ response = template.format(self.msg.user, self.msg.master, self.time, title, description, url, hqurl) return response def voiceHandle(self): response = get_turing_response(self.msg.recognition) result = self.textHandle(self.msg.user, self.msg.master, self.time, response) return result def imageHandle(self, user='', master='', time='', mediaid=''): template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[image]]></MsgType> <Image> <MediaId><![CDATA[{}]]></MediaId> </Image> </xml> """ if mediaid == '': response = self.msg.mediaid else: response = mediaid result = template.format(self.msg.user, self.msg.master, self.time, response) return result def videoHandle(self): return 'video' def shortVideoHandle(self): return 'shortvideo' def locationHandle(self): return 'location' def linkHandle(self): return 'link' def eventHandle(self): return 'event' def newsHandle(self, items): # 图文消息这块真的好多坑,尤其是<![CDATA[]]>中间不可以有空格,可怕极了 articlestr = """ <item> <Title><![CDATA[{}]]></Title> <Description><![CDATA[{}]]></Description> <PicUrl><![CDATA[{}]]></PicUrl> <Url><![CDATA[{}]]></Url> </item> """ itemstr = "" for item in items: itemstr += str(articlestr.format(item['title'], item['description'], item['picurl'], item['url'])) template = """ <xml> <ToUserName><![CDATA[{}]]></ToUserName> <FromUserName><![CDATA[{}]]></FromUserName> <CreateTime>{}</CreateTime> <MsgType><![CDATA[news]]></MsgType> <ArticleCount>{}</ArticleCount> <Articles>{}</Articles> </xml> """ result = template.format(self.msg.user, self.msg.master, self.time, len(items), itemstr) return result
robot.py
这个文件属于那种画龙点睛性质的。
#!/usr/bin python #coding: utf8 import requests import json def get_turing_response(req=""): url = "http://www.tuling123.com/openapi/api" secretcode = "嘿嘿,这个就不说啦" response = requests.post(url=url, json={"key": secretcode, "info": req, "userid": 12345678}) return json.loads(response.text)['text'] if response.status_code == 200 else "" def get_qingyunke_response(req=""): url = "http://api.qingyunke.com/api.php?key=free&appid=0&msg={}".format(req) response = requests.get(url=url) return json.loads(response.text)['content'] if response.status_code == 200 else "" # 简单做下。后面慢慢来 def get_response_by_keyword(keyword): if '团建' in keyword: result = {"type": "image", "content": "3s9Dh5rYdP9QruoJ_M6tIYDnxLLdsQNCMxkY0L2FMi6HhMlNPlkA1-50xaE_imL7"} elif 'music' in keyword or '音乐' in keyword: musicurl='http://204.11.1.34:9999/dl.stream.qqmusic.qq.com/C400001oO7TM2DE1OE.m4a?vkey=3DFC73D67AF14C36FD1128A7ABB7247D421A482EBEDA17DE43FF0F68420032B5A2D6818E364CB0BD4EAAD44E3E6DA00F5632859BEB687344&guid=5024663952&uin=1064319632&fromtag=66' result = {"type": "music", "content": {"title": "80000", "description":"有个男歌手姓巴,他的女朋友姓万,于是这首歌叫80000", "url": musicurl, "hqurl": musicurl}} elif '关于' in keyword: items = [{"title": "关于我", "description":"喜欢瞎搞一些脚本", "picurl":"https://avatars1.githubusercontent.com/u/12973402?s=460&v=4", "url":"https://github.com/guoruibiao"}, {"title": "我的博客", "description":"收集到的,瞎写的一些博客", "picurl":"http://avatar.csdn.net/0/8/F/1_marksinoberg.jpg", "url":"http://blog.csdn.net/marksinoberg"}, {"title": "薛定谔的:dog:", "description": "副标题有点奇怪,不知道要怎么设置比较好","picurl": "https://www.baidu.com/img/bd_logo1.png","url": "http://www.baidu.com"} ] result = {"type": "news", "content": items} else: result = {"type": "text", "content": "可以自由进行拓展"} return result
其实这看起来是一个文件,其实可以拓展为很多的方面。
如果想通过公众号来监控服务器的运行情况,就可以添加一个对服务器负载的监控的脚本;
如果想做一些爬虫,每天抓取一些高质量的文章,然后通过公众号进行展示。
不方便使用电脑的情况下,让公众号调用一些命令也可以算是曲线救国的一种方式。
等等吧,其实有多少想法,就可以用Python进行事先。然后通过公众号这个平台进行展示。
易错点
在从PHP重构为Python的过程中,我其实也是遇到了一些坑的。下面总结下,如果恰好能帮助到遇到同样问题的你,那我这篇文章也算是没有白写了。
微信公众号的开发,其实关键就在于理解这个工作的模式。大致有这么两条路。
用户把消息发送到微信公众平台上,平台把信息拼接组装成XML发到我们自己的服务器。(通过一系列的认证,校验,让平台知道,我们的服务是合法的),然后服务器将XML进行解析,处理。
我们的服务器解析处理完成后,将数据再次拼接组装成XML,发给微信公众平台,平台帮我们把数据反馈给对应的用户。
这样,一个交互就算是完成了。在这个过程中,有下面几个容易出错的地方。
token校验: token的校验是一个get方式的请求。通过代码我们也可以看到,就是对singature的校验,具体看代码就明白了。
XML数据的解析,对于不同的消息,记得使用不同的格式。其中很容易出错的就是格式不规范。 <!CDATA[[]]> 中括号之间最好不要有空格,不然定位起错误还是很麻烦的。
服务的稳定性。这里用的web框架是flask,小巧精良。但是对并发的支持性不是很好,对此可以使用uwsgi和Nginx来实现一个更稳定的服务。如果就是打算自己玩一玩,通过命令行启用(如python api.py)就不是很保险了,因为很有可能会因为用户的一个奇怪的输入导致整个服务垮掉,建议使用nohup的方式,来在一定程度上保证服务的质量。
结果演示
目前这个公众号支持文字,语音,图片,图文等消息类型。示例如下。
总结
在将公众号从PHP重构为Python的过程中,遇到了一些问题,然后通过不断的摸索,慢慢的也把问题解决了。其实有时候就是这样,只有不断的发现问题,才能不断的提升自己。
这里其实并没有深入的去完善,重构后的微信公众号其实能做的还有很多,毕竟就看敢不敢想嘛。好了,就先扯这么多了,后面如果有好的思路和实现,再回来更新好了。