概要
这是一个使用python实现一个简单的聊天室的功能,里面包含群聊,私聊两种聊天方式.实现的方式是使用套接字编程的一个使用TCP协议 c/s结构的聊天室
实现思路
x01 服务端的建立
首先,在服务端,使用socket进行消息的接受,每接受一个socket的请求,就开启一个新的线程来管理消息的分发与接受,同时,又存在一个handler来管理所有的线程,从而实现对聊天室的各种功能的处理
x02 客户端的建立
客户端的建立就要比服务端简单多了,客户端的作用只是对消息的发送以及接受,以及按照特定的规则去输入特定的字符从而实现不同的功能的使用,因此,在客户端这里,只需要去使用两个线程,一个是专门用于接受消息,一个是专门用于发送消息的
至于为什么不用一个呢,那是因为,只用一个的话,当接受了消息,在发送之前接受消息的处于阻塞状态,同理,发送消息也是,那么要是将这两个功能放在一个地方实现,就会导致没有办法连续发送或者接受消息了
实现方式
服务端实现
import json import threading from socket import * from time import ctime class PyChattingServer: __socket = socket(AF_INET, SOCK_STREAM, 0) __address = (\'\', 12231) __buf = 1024 def __init__(self): self.__socket.bind(self.__address) self.__socket.listen(20) self.__msg_handler = ChattingHandler() def start_session(self): print(\'等待客户连接...\\r\\n\') try: while True: cs, caddr = self.__socket.accept() # 利用handler来管理线程,实现线程之间的socket的相互通信 self.__msg_handler.start_thread(cs, caddr) except socket.error: pass class ChattingThread(threading.Thread): __buf = 1024 def __init__(self, cs, caddr, msg_handler): super(ChattingThread, self).__init__() self.__cs = cs self.__caddr = caddr self.__msg_handler = msg_handler # 使用多线程管理会话 def run(self): try: print(\'...连接来自于:\', self.__caddr) data = \'欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\\r\\n\' self.__cs.sendall(bytes(data, \'utf-8\')) while True: data = self.__cs.recv(self.__buf).decode(\'utf-8\') if not data: break self.__msg_handler.handle_msg(data, self.__cs) print(data) except socket.error as e: print(e.args) pass finally: self.__msg_handler.close_conn(self.__cs) self.__cs.close() class ChattingHandler: __help_str = \"[ SYSTEM ]\\r\\n\" \\ \"输入/ls,即可获得所有登陆用户信息\\r\\n\" \\ \"输入/h,即可获得帮助\\r\\n\" \\ \"输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\\r\\n\" \\ \"输入/i,即可屏蔽群聊信息\\r\\n\" \\ \"再次输入/i,即可取消屏蔽\\r\\n\" \\ \"所有首字符为/的信息都不会发送出去\" __buf = 1024 __socket_list = [] __user_name_to_socket = {} __socket_to_user_name = {} __user_name_to_broadcast_state = {} def start_thread(self, cs, caddr): self.__socket_list.append(cs) chat_thread = ChattingThread(cs, caddr, self) chat_thread.start() def close_conn(self, cs): if cs not in self.__socket_list: return # 去除socket的记录 nickname = \"SOMEONE\" if cs in self.__socket_list: self.__socket_list.remove(cs) # 去除socket与username之间的映射关系 if cs in self.__socket_to_user_name: nickname = self.__socket_to_user_name[cs] self.__user_name_to_socket.pop(self.__socket_to_user_name[cs]) self.__socket_to_user_name.pop(cs) self.__user_name_to_broadcast_state.pop(nickname) nickname += \" \" # 广播某玩家退出聊天室 self.broadcast_system_msg(nickname + \"离开了PY_CHATTING\") # 管理用户输入的信息 def handle_msg(self, msg, cs): js = json.loads(msg) if js[\'type\'] == \"login\": if js[\'msg\'] not in self.__user_name_to_socket: if \' \' in js[\'msg\']: self.send_to(json.dumps({ \'type\': \'login\', \'success\': False, \'msg\': \'账号不能够带有空格\' }), cs) else: self.__user_name_to_socket[js[\'msg\']] = cs self.__socket_to_user_name[cs] = js[\'msg\'] self.__user_name_to_broadcast_state[js[\'msg\']] = True self.send_to(json.dumps({ \'type\': \'login\', \'success\': True, \'msg\': \'昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)\' }), cs) # 广播其他人,他已经进入聊天室 self.broadcast_system_msg(js[\'msg\'] + \"已经进入了聊天室\") else: self.send_to(json.dumps({ \'type\': \'login\', \'success\': False, \'msg\': \'账号已存在\' }), cs) # 若玩家处于屏蔽模式,则无法发送群聊消息 elif js[\'type\'] == \"broadcast\": if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: self.broadcast(js[\'msg\'], cs) else: self.send_to(json.dumps({ \'type\': \'broadcast\', \'msg\': \'屏蔽模式下无法发送群聊信息\' }), cs) elif js[\'type\'] == \"ls\": self.send_to(json.dumps({ \'type\': \'ls\', \'msg\': self.get_all_login_user_info() }), cs) elif js[\'type\'] == \"help\": self.send_to(json.dumps({ \'type\': \'help\', \'msg\': self.__help_str }), cs) elif js[\'type\'] == \"sendto\": self.single_chatting(cs, js[\'nickname\'], js[\'msg\']) elif js[\'type\'] == \"ignore\": self.exchange_ignore_state(cs) def exchange_ignore_state(self, cs): if cs in self.__socket_to_user_name: state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] if state: state = False else: state = True self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs]) self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]: msg = \"通常模式\" else: msg = \"屏蔽模式\" self.send_to(json.dumps({ \'type\': \'ignore\', \'success\': True, \'msg\': \'[TIME : %s]\\r\\n[ SYSTEM ] : %s\\r\\n\' % (ctime(), \"模式切换成功,现在是\" + msg) }), cs) else: self.send_to({ \'type\': \'ignore\', \'success\': False, \'msg\': \'切换失败\' }, cs) def single_chatting(self, cs, nickname, msg): if nickname in self.__user_name_to_socket: msg = \'[TIME : %s]\\r\\n[ %s CHATTING TO %s ] : %s\\r\\n\' % ( ctime(), self.__socket_to_user_name[cs], nickname, msg) self.send_to_list(json.dumps({ \'type\': \'single\', \'msg\': msg }), self.__user_name_to_socket[nickname], cs) else: self.send_to(json.dumps({ \'type\': \'single\', \'msg\': \'该用户不存在\' }), cs) print(nickname) def send_to_list(self, msg, *cs): for i in range(len(cs)): self.send_to(msg, cs[i]) def get_all_login_user_info(self): login_list = \"[ SYSTEM ] ALIVE USER : \\r\\n\" for key in self.__socket_to_user_name: login_list += self.__socket_to_user_name[key] + \",\\r\\n\" return login_list def send_to(self, msg, cs): if cs not in self.__socket_list: self.__socket_list.append(cs) cs.sendall(bytes(msg, \'utf-8\')) def broadcast_system_msg(self, msg): data = \'[TIME : %s]\\r\\n[ SYSTEM ] : %s\\r\\n\' % (ctime(), msg) js = json.dumps({ \'type\': \'system_msg\', \'msg\': data }) # 屏蔽了群聊的玩家也可以获得系统的群发信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name: self.__socket_list[i].sendall(bytes(js, \'utf-8\')) def broadcast(self, msg, cs): data = \'[TIME : %s]\\r\\n[%s] : %s\\r\\n\' % (ctime(), self.__socket_to_user_name[cs], msg) js = json.dumps({ \'type\': \'broadcast\', \'msg\': data }) # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息 for i in range(len(self.__socket_list)): if self.__socket_list[i] in self.__socket_to_user_name \\ and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]: self.__socket_list[i].sendall(bytes(js, \'utf-8\')) def main(): server = PyChattingServer() server.start_session() main()
客户端的实现
import json import threading from socket import * is_login = False is_broadcast = True class ClientReceiveThread(threading.Thread): __buf = 1024 def __init__(self, cs): super(ClientReceiveThread, self).__init__() self.__cs = cs def run(self): self.receive_msg() def receive_msg(self): while True: msg = self.__cs.recv(self.__buf).decode(\'utf-8\') if not msg: break js = json.loads(msg) if js[\'type\'] == \"login\": if js[\'success\']: global is_login is_login = True print(js[\'msg\']) elif js[\'type\'] == \"ignore\": if js[\'success\']: global is_broadcast if is_broadcast: is_broadcast = False else: is_broadcast = True print(js[\'msg\']) else: if not is_broadcast: print(\"[现在处于屏蔽模式]\") print(js[\'msg\']) class ClientSendMsgThread(threading.Thread): def __init__(self, cs): super(ClientSendMsgThread, self).__init__() self.__cs = cs def run(self): self.send_msg() # 根据不同的输入格式来进行不同的聊天方式 def send_msg(self): while True: js = None msg = input() if not is_login: js = json.dumps({ \'type\': \'login\', \'msg\': msg }) elif msg[0] == \"@\": data = msg.split(\' \') if not data: print(\"请重新输入\") break nickname = data[0] nickname = nickname.strip(\"@\") if len(data) == 1: data.append(\" \") js = json.dumps({ \'type\': \'sendto\', \'nickname\': nickname, \'msg\': data[1] }) elif msg == \"/help\": js = json.dumps({ \'type\': \'help\', \'msg\': None }) elif msg == \"/ls\": js = json.dumps({ \'type\': \'ls\', \'msg\': None }) elif msg == \"/i\": js = json.dumps({ \'type\': \'ignore\', \'msg\': None }) else: if msg[0] != \'/\': js = json.dumps({ \'type\': \'broadcast\', \'msg\': msg }) if js is not None: self.__cs.sendall(bytes(js, \'utf-8\')) def main(): buf = 1024 # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了 address = (\"127.0.0.1\", 12231) cs = socket(AF_INET, SOCK_STREAM, 0) cs.connect(address) data = cs.recv(buf).decode(\"utf-8\") if data: print(data) receive_thread = ClientReceiveThread(cs) receive_thread.start() send_thread = ClientSendMsgThread(cs) send_thread.start() while True: pass main()
这样一个简单的聊天室就建立了。
总结
在这个实现聊天室当中,我使用的是json格式的字符串信息来编写的协议,或许,也可以使用一些更加简单的方式去实现
其实这个聊天室也就是一个最基本的socket编程的实现方案,也是一些属于网络方面的比较简单的编写吧
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持自学编程网。