websocket简单使用
简单实现
参考:https://websockets.readthedoc...
PS:此文章只限于python版本大于3.6
- 前期准备
server端
import asyncio import websockets async def hello(websocket, path): print(path) #这里会在client开始连接时就调用 name = await websocket.recv() #这里会挺住,等待client发送消息 print(f"< {name}") greeting = f"Hello {name}!" await websocket.send(greeting) print(f"> {greeting}") start_server = websockets.serve(hello, 'localhost', 8765) asyncio.get_event_loop().run_until_complete(start_server) #先执行这里 asyncio.get_event_loop().run_forever() #在这里停住,等待
client
import asyncio import websockets async def hello(): async with websockets.connect( 'ws://localhost:8765') as websocket: name = input("What's your name? ") await websocket.send(name) print(f"> {name}") greeting = await websocket.recv() print(f"< {greeting}") asyncio.get_event_loop().run_until_complete(hello())
二:加密实现
server
import asyncio import pathlib import ssl import websockets async def hello(websocket, path): name = await websocket.recv() print(f"< {name}") greeting = f"Hello {name}!" await websocket.send(greeting) print(f"> {greeting}") ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain( pathlib.Path(__file__).with_name('localhost.pem')) #这里应该是要填写加密的文件,此处没有深入研究 start_server = websockets.serve( hello, 'localhost', 8765, ssl=ssl_context) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
client
import asyncio import pathlib import ssl import websockets ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.load_verify_locations( pathlib.Path(__file__).with_name('localhost.pem')) async def hello(): async with websockets.connect( 'wss://localhost:8765', ssl=ssl_context) as websocket: name = input("What's your name? ") await websocket.send(name) print(f"> {name}") greeting = await websocket.recv() print(f"< {greeting}") asyncio.get_event_loop().run_until_complete(hello())
三:服务器和浏览器的实现
server
import asyncio import datetime import random import websockets async def time(websocket, path): while True: now = datetime.datetime.utcnow().isoformat() + 'Z' await websocket.send(now) await asyncio.sleep(random.random() * 3) start_server = websockets.serve(time, '127.0.0.1', 5678) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
browser
<!DOCTYPE html> <html> <head> <title>WebSocket demo</title> </head> <body> <script> var ws = new WebSocket("ws://127.0.0.1:5678/"), messages = document.createElement('ul'); ws.onmessage = function (event) { var messages = document.getElementsByTagName('ul')[0], message = document.createElement('li'), content = document.createTextNode(event.data); message.appendChild(content); messages.appendChild(message); }; document.body.appendChild(messages); </script> </body> </html>PS:此处先执行server代码,然后再打开浏览器就可以看到过程
同步例子
server
import asyncio import json import logging import websockets logging.basicConfig() STATE = {'value': 0} USERS = set() def state_event(): return json.dumps({'type': 'state', **STATE}) def users_event(): return json.dumps({'type': 'users', 'count': len(USERS)}) async def notify_state(): if USERS: # asyncio.wait doesn't accept an empty list message = state_event() await asyncio.wait([user.send(message) for user in USERS]) async def notify_users(): if USERS: # asyncio.wait doesn't accept an empty list message = users_event() await asyncio.wait([user.send(message) for user in USERS]) async def register(websocket): USERS.add(websocket) await notify_users() async def unregister(websocket): USERS.remove(websocket) await notify_users() async def counter(websocket, path): # register(websocket) sends user_event() to websocket await register(websocket) try: await websocket.send(state_event()) async for message in websocket: data = json.loads(message) if data['action'] == 'minus': STATE['value'] -= 1 await notify_state() elif data['action'] == 'plus': STATE['value'] += 1 await notify_state() else: logging.error( "unsupported event: {}", data) finally: await unregister(websocket) asyncio.get_event_loop().run_until_complete( websockets.serve(counter, 'localhost', 6789)) asyncio.get_event_loop().run_forever()
client
<!DOCTYPE html> <html> <head> <title>WebSocket demo</title> <style type="text/css"> body { font-family: "Courier New", sans-serif; text-align: center; } .buttons { font-size: 4em; display: flex; justify-content: center; } .button, .value { line-height: 1; padding: 2rem; margin: 2rem; border: medium solid; min-height: 1em; min-width: 1em; } .button { cursor: pointer; user-select: none; } .minus { color: red; } .plus { color: green; } .value { min-width: 2em; } .state { font-size: 2em; } </style> </head> <body> <div class="buttons"> <div class="minus button">-</div> <div class="value">?</div> <div class="plus button">+</div> </div> <div class="state"> <span class="users">?</span> online </div> <script> var minus = document.querySelector('.minus'), plus = document.querySelector('.plus'), value = document.querySelector('.value'), users = document.querySelector('.users'), websocket = new WebSocket("ws://127.0.0.1:6789/"); minus.onclick = function (event) { websocket.send(JSON.stringify({action: 'minus'})); } plus.onclick = function (event) { websocket.send(JSON.stringify({action: 'plus'})); } websocket.onmessage = function (event) { data = JSON.parse(event.data); switch (data.type) { case 'state': value.textContent = data.value; break; case 'users': users.textContent = ( data.count.toString() + " user" + (data.count == 1 ? "" : "s")); break; default: console.error( "unsupported event", data); } }; </script> </body> </html>websocket提供一个查看当前状态的接口
python -m websockets wss://echo.websocket.org/
如果Python版本是3.5以下
server
import asyncio import websockets @asyncio.coroutine def hello(websocket, path): name = yield from websocket.recv() print("< {}".format(name)) greeting = "Hello {}!".format(name) yield from websocket.send(greeting) print("> {}".format(greeting)) start_server = websockets.serve(hello, 'localhost', 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
client
import asyncio import websockets @asyncio.coroutine def hello(): websocket = yield from websockets.connect( 'ws://localhost:8765/') try: name = input("What's your name? ") yield from websocket.send(name) print("> {}".format(name)) greeting = yield from websocket.recv() print("< {}".format(greeting)) finally: yield from websocket.close() asyncio.get_event_loop().run_until_complete(hello())
相关推荐
柳木木的IT 2020-11-04
joynet00 2020-09-23
wenf00 2020-09-14
蓝色深海 2020-08-16
wuychn 2020-08-16
取个好名字真难 2020-08-06
darylove 2020-06-26
shufen0 2020-06-20
Lovexinyang 2020-06-14
WangBowen 2020-06-14
firejq 2020-06-14
hjhmpl 2020-06-14
水痕 2020-06-07
guozewei0 2020-06-06
woniyu 2020-06-02
取个好名字真难 2020-06-01
guozewei0 2020-05-28
woniyu 2020-05-26