温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何用python实现聊天小程序

发布时间:2022-05-18 11:56:02 来源:亿速云 阅读:166 作者:iii 栏目:大数据

这篇文章主要介绍了如何用python实现聊天小程序的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇如何用python实现聊天小程序文章都会有所收获,下面我们一起来看看吧。

概要

这是一个使用python实现一个简单的聊天室的功能,里面包含群聊,私聊两种聊天方式.实现的方式是使用套接字编程的一个使用TCP协议 c/s结构的聊天室

实现思路

x01 服务端的建立

首先,在服务端,使用socket进行消息的接受,每接受一个socket的请求,就开启一个新的线程来管理消息的分发与接受,同时,又存在一个handler来管理所有的线程,从而实现对聊天室的各种功能的处理

x02 客户端的建立

客户端的建立就要比服务端简单多了,客户端的作用只是对消息的发送以及接受,以及按照特定的规则去输入特定的字符从而实现不同的功能的使用,因此,在客户端这里,只需要去使用两个线程,一个是专门用于接受消息,一个是专门用于发送消息的

至于为什么不用一个呢,那是因为,只用一个的话,当接受了消息,在发送之前接受消息的处于阻塞状态,同理,发送消息也是,那么要是将这两个功能放在一个地方实现,就会导致没有办法连续发送或者接受消息了

实现方式

服务端实现

如何用python实现聊天小程序

如何用python实现聊天小程序

import json import threading from socket import * from time import ctime class PyChattingServer:     __socket = socket(AF_INET, SOCK_STREAM, 0)     __address = ('', 12231)     __buf = 1024     def __init__(self):         self.__socket.bind(self.__address)         self.__socket.listen(20)         self.__msg_handler = ChattingHandler()     def start_session(self):         print('等待客户连接...\r\n')         try:             while True:                 cs, caddr = self.__socket.accept()                 # 利用handler来管理线程,实现线程之间的socket的相互通信                 self.__msg_handler.start_thread(cs, caddr)         except socket.error:             pass class ChattingThread(threading.Thread):     __buf = 1024     def __init__(self, cs, caddr, msg_handler):         super(ChattingThread, self).__init__()         self.__cs = cs         self.__caddr = caddr         self.__msg_handler = msg_handler     # 使用多线程管理会话     def run(self):         try:             print('...连接来自于:', self.__caddr)             data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n'             self.__cs.sendall(bytes(data, 'utf-8'))             while True:                 data = self.__cs.recv(self.__buf).decode('utf-8')                 if not data:                     break                 self.__msg_handler.handle_msg(data, self.__cs)                 print(data)         except socket.error as e:             print(e.args)             pass         finally:             self.__msg_handler.close_conn(self.__cs)             self.__cs.close() class ChattingHandler:     __help_str = "[ SYSTEM ]\r\n" \                  "输入/ls,即可获得所有登陆用户信息\r\n" \                  "输入/h,即可获得帮助\r\n" \                  "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \                  "输入/i,即可屏蔽群聊信息\r\n" \                  "再次输入/i,即可取消屏蔽\r\n" \                  "所有首字符为/的信息都不会发送出去"     __buf = 1024     __socket_list = []     __user_name_to_socket = {}     __socket_to_user_name = {}     __user_name_to_broadcast_state = {}     def start_thread(self, cs, caddr):         self.__socket_list.append(cs)         chat_thread = ChattingThread(cs, caddr, self)         chat_thread.start()     def close_conn(self, cs):         if cs not in self.__socket_list:             return         # 去除socket的记录         nickname = "SOMEONE"         if cs in self.__socket_list:             self.__socket_list.remove(cs)         # 去除socket与username之间的映射关系         if cs in self.__socket_to_user_name:             nickname = self.__socket_to_user_name[cs]             self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])             self.__socket_to_user_name.pop(cs)             self.__user_name_to_broadcast_state.pop(nickname)         nickname += " "         # 广播某玩家退出聊天室         self.broadcast_system_msg(nickname + "离开了PY_CHATTING")     # 管理用户输入的信息     def handle_msg(self, msg, cs):         js = json.loads(msg)         if js['type'] == "login":             if js['msg'] not in self.__user_name_to_socket:                 if ' ' in js['msg']:                     self.send_to(json.dumps({                         'type': 'login',                         'success': False,                         'msg': '账号不能够带有空格'                     }), cs)                 else:                     self.__user_name_to_socket[js['msg']] = cs                     self.__socket_to_user_name[cs] = js['msg']                     self.__user_name_to_broadcast_state[js['msg']] = True                     self.send_to(json.dumps({                         'type': 'login',                         'success': True,                         'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'                     }), cs)                     # 广播其他人,他已经进入聊天室                     self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")             else:                 self.send_to(json.dumps({                     'type': 'login',                     'success': False,                     'msg': '账号已存在'                 }), cs)         # 若玩家处于屏蔽模式,则无法发送群聊消息         elif js['type'] == "broadcast":             if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:                 self.broadcast(js['msg'], cs)             else:                 self.send_to(json.dumps({                     'type': 'broadcast',                     'msg': '屏蔽模式下无法发送群聊信息'                 }), cs)         elif js['type'] == "ls":             self.send_to(json.dumps({                 'type': 'ls',                 'msg': self.get_all_login_user_info()             }), cs)         elif js['type'] == "help":             self.send_to(json.dumps({                 'type': 'help',                 'msg': self.__help_str             }), cs)         elif js['type'] == "sendto":             self.single_chatting(cs, js['nickname'], js['msg'])         elif js['type'] == "ignore":             self.exchange_ignore_state(cs)     def exchange_ignore_state(self, cs):         if cs in self.__socket_to_user_name:             state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]             if state:                 state = False             else:                 state = True             self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])             self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state             if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:                 msg = "通常模式"             else:                 msg = "屏蔽模式"             self.send_to(json.dumps({                 'type': 'ignore',                 'success': True,                 'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)             }), cs)         else:             self.send_to({                 'type': 'ignore',                 'success': False,                 'msg': '切换失败'             }, cs)     def single_chatting(self, cs, nickname, msg):         if nickname in self.__user_name_to_socket:             msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (                 ctime(), self.__socket_to_user_name[cs], nickname, msg)             self.send_to_list(json.dumps({                 'type': 'single',                 'msg': msg             }), self.__user_name_to_socket[nickname], cs)         else:             self.send_to(json.dumps({                 'type': 'single',                 'msg': '该用户不存在'             }), cs)         print(nickname)     def send_to_list(self, msg, *cs):         for i in range(len(cs)):             self.send_to(msg, cs[i])     def get_all_login_user_info(self):         login_list = "[ SYSTEM ] ALIVE USER : \r\n"         for key in self.__socket_to_user_name:             login_list += self.__socket_to_user_name[key] + ",\r\n"         return login_list     def send_to(self, msg, cs):         if cs not in self.__socket_list:             self.__socket_list.append(cs)         cs.sendall(bytes(msg, 'utf-8'))     def broadcast_system_msg(self, msg):         data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)         js = json.dumps({             'type': 'system_msg',             'msg': data         })         # 屏蔽了群聊的玩家也可以获得系统的群发信息         for i in range(len(self.__socket_list)):             if self.__socket_list[i] in self.__socket_to_user_name:                 self.__socket_list[i].sendall(bytes(js, 'utf-8'))     def broadcast(self, msg, cs):         data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)         js = json.dumps({             'type': 'broadcast',             'msg': data         })         # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息         for i in range(len(self.__socket_list)):             if self.__socket_list[i] in self.__socket_to_user_name \                     and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:                 self.__socket_list[i].sendall(bytes(js, 'utf-8')) def main():     server = PyChattingServer()     server.start_session() main()

客户端的实现

import json import threading from socket import * is_login = False is_broadcast = True class ClientReceiveThread(threading.Thread):     __buf = 1024     def __init__(self, cs):         super(ClientReceiveThread, self).__init__()         self.__cs = cs     def run(self):         self.receive_msg()     def receive_msg(self):         while True:             msg = self.__cs.recv(self.__buf).decode('utf-8')             if not msg:                 break             js = json.loads(msg)             if js['type'] == "login":                 if js['success']:                     global is_login                     is_login = True                 print(js['msg'])             elif js['type'] == "ignore":                 if js['success']:                     global is_broadcast                     if is_broadcast:                         is_broadcast = False                     else:                         is_broadcast = True                 print(js['msg'])             else:                 if not is_broadcast:                     print("[现在处于屏蔽模式]")                 print(js['msg']) class ClientSendMsgThread(threading.Thread):     def __init__(self, cs):         super(ClientSendMsgThread, self).__init__()         self.__cs = cs     def run(self):         self.send_msg()     # 根据不同的输入格式来进行不同的聊天方式     def send_msg(self):         while True:             js = None             msg = input()             if not is_login:                 js = json.dumps({                     'type': 'login',                     'msg': msg                 })             elif msg[0] == "@":                 data = msg.split(' ')                 if not data:                     print("请重新输入")                     break                 nickname = data[0]                 nickname = nickname.strip("@")                 if len(data) == 1:                     data.append(" ")                 js = json.dumps({                     'type': 'sendto',                     'nickname': nickname,                     'msg': data[1]                 })             elif msg == "/help":                 js = json.dumps({                     'type': 'help',                     'msg': None                 })             elif msg == "/ls":                 js = json.dumps({                     'type': 'ls',                     'msg': None                 })             elif msg == "/i":                 js = json.dumps({                     'type': 'ignore',                     'msg': None                 })             else:                 if msg[0] != '/':                     js = json.dumps({                         'type': 'broadcast',                         'msg': msg                     })             if js is not None:                 self.__cs.sendall(bytes(js, 'utf-8')) def main():     buf = 1024     # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了     address = ("127.0.0.1", 12231)     cs = socket(AF_INET, SOCK_STREAM, 0)     cs.connect(address)     data = cs.recv(buf).decode("utf-8")     if data:         print(data)     receive_thread = ClientReceiveThread(cs)     receive_thread.start()     send_thread = ClientSendMsgThread(cs)     send_thread.start()     while True:         pass main()

这样一个简单的聊天室就建立了。

关于“如何用python实现聊天小程序”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“如何用python实现聊天小程序”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI