Python WebSocket服务端编程代码完成gtalk机器人,websocketgtalk,本文python源码为实
Python WebSocket服务端编程代码完成gtalk机器人,websocketgtalk,本文python源码为实
本文python源码为实现,Python WebSocket服务端编程代码完成gtalk机器人的全部代码段。需要用到python sys、sleekxmpp、reactor等python模块及方法,在代码中导入部分可以查看到详细方法。
python websocket库,运作机制及客户端和服务端的 API 实现。PythonWebSocket的具体操作方法和工作原理也可以查看相关文章来学习。
Python socket套接字模块server/client端操作
import sysfrom twisted.internet import reactorfrom twisted.python import logfrom autobahn.websocket import WebSocketServerFactory, WebSocketServerProtocolimport sleekxmppreload(sys)sys.setdefaultencoding('utf8')class gtalkBot(sleekxmpp.ClientXMPP): """ A basic SleekXMPP bot that will log in, send a message, and then log out. """ def __init__(self, jid, password): sleekxmpp.ClientXMPP.__init__(self, jid, password) self.recipient = 'wxg4net@gmail.com' self.status_text = '' self.add_event_handler("session_start", self.start) self.add_event_handler("message", self.muc_message) self.add_event_handler('presence_available', self.handle_presence_available) self.add_event_handler('presence_unavailable', self.handle_presence_unavailable) self.add_event_handler('presence_dnd', self.handle_presence_dnd) def start(self, event): self.send_presence() self.get_roster() def handle_presence_unavailable(self, presence): if cmp(presence['from'].bare, self.recipient) == 0: self.status_text = presence['status'] def handle_presence_available(self, presence): if cmp(presence['from'].bare, self.recipient) == 0: self.status_text = presence['status'] def handle_presence_dnd(self, presence): if cmp(presence['from'].bare, self.recipient) == 0: self.status_text = presence['status'] def muc_message(self, msg): fwho = msg['from'].bare body = msg['body'] if cmp(body,'-qsbk') == 0: self.send_message(mto=fwho, mbody='当前服务已关闭', mtype='chat') elif cmp(body,'-help') == 0: self.send_message(mto=fwho, mbody='当前服务开发中...', mtype='chat') elif cmp(fwho, self.recipient) == 0: factory.broadcast(str(body)) else: self.send_message(mto=fwho, mbody='当前服务开发中...', mtype='chat') def sendMessageTome(self, message): self.send_message(mto="wxg4net@gmail.com", mbody=message, mtype='chat') #python websocketclass BroadcastServerProtocol(WebSocketServerProtocol): def __init__(self): self.istip = True self.Heartbeat() def Heartbeat(self): reactor.callLater(1, self.Heartbeat) def onOpen(self): self.factory.register(self) def onMessage(self, msg, binary): xmpp.sendMessageTome(msg) if self.istip: self.sendMessage('你好,websocket python的当前状态是:'+str(xmpp.status_text)+'。---祝你生活愉快!') self.istip = False def connectionLost(self, reason): WebSocketServerProtocol.connectionLost(self, reason) self.factory.unregister(self)class BroadcastServerFactory(WebSocketServerFactory): protocol = BroadcastServerProtocol def __init__(self): WebSocketServerFactory.__init__(self) self.clients = [] def register(self, client): if not client in self.clients: self.clients.append(client) def unregister(self, client): if client in self.clients: self.clients.remove(client) def broadcast(self, message): if self.clients: for c in self.clients: c.sendMessage(message)#www.iplaypy.comif __name__ == '__main__': 1a58 log.startLogging(sys.stdout) xmpp = gtalkBot("phzggzs@gmail.com", "*******") xmpp.register_plugin('xep_0030') # Service Discovery xmpp.register_plugin('xep_0199') # XMPP Ping if xmpp.connect(): xmpp.process(threaded=True) factory = BroadcastServerFactory() reactor.listenTCP(9090, factory) reactor.run()
编橙之家文章,
相关内容
- Python zip文件解压乱码的解决方法,pythonzip解压乱码,P
- Python写的比较2个文件不同的程序,python写程序,Python写
- python 删除过期文件的方法+源码,,python 删除过期文
- 分享Python获取文件及文件夹大小的方法源码,python源码
- 最简单Python删除目录下文件内容的方法代码,python代码
- Python文件合并与分割操作方法工具,python文件合并,编橙
- 如何用Python创建生成xml文档文件的方法,pythonxml,用Py
- 用Python解压缩rar、zip文件的方法,python解压缩rarzip,编橙
- 游戏水桶倒水问题Python语言的解决方法,倒水python,Py
- Python算法--最长公共子串算法代码讲解,python算法,Pyt
评论关闭