您的位置:首页技术文章
文章详情页

Python通过zookeeper实现分布式服务代码解析

浏览:2日期:2022-07-16 15:26:53

借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.

这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.

zk_server.py

import threadingimport jsonimport socketimport sysfrom kazoo.client import KazooClient# TCP服务端绑定端口开启监听,同时将自己注册到zkclass ZKServer(object): def __init__(self, host, port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = host self.port = port self.sock.bind((host, port)) self.zk = None def serve(self): ''' 开始服务,每次获取得到一个信息,都新建一个线程处理 ''' self.sock.listen(128) self.register_zk() print('开始监听') while True: conn, addr = self.sock.accept() print('建立链接%s' % str(addr)) t = threading.Thread(target=self.handle, args=(conn, addr)) t.start() # 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束 def handle(self, conn, addr): while True: data=conn.recv(1024) if not data or data.decode(’utf-8’) == ’exit’:break print(data.decode(’utf-8’)) conn.close() print(’My work is done!!!’) # 将自己注册到zk,临时节点,所以连接不能中断 def register_zk(self): ''' 注册到zookeeper ''' self.zk = KazooClient(hosts=’127.0.0.1:2181’) self.zk.start() self.zk.ensure_path(’/rpc’) # 创建根节点 value = json.dumps({’host’: self.host, ’port’: self.port}) # 创建服务子节点 self.zk.create(’/rpc/server’, value.encode(), ephemeral=True, sequence=True)if __name__ == ’__main__’: if len(sys.argv) < 3: print('usage:python server.py [host] [port]') exit(1) host = sys.argv[1] port = sys.argv[2] server = ZKServer(host, int(port)) server.serve()

zk_client.py

import randomimport sysimport timeimport jsonimport socketfrom kazoo.client import KazooClient# 客户端连接zk,并从zk获取可用的服务器列表class ZKClient(object): def __init__(self): self._zk = KazooClient(hosts=’127.0.0.1:2181’) self._zk.start() self._get_servers() def _get_servers(self, event=None): ''' 从zookeeper获取服务器地址信息列表 ''' servers = self._zk.get_children(’/rpc’, watch=self._get_servers) # print(servers) self._servers = [] for server in servers: data = self._zk.get(’/rpc/’ + server)[0] if data:addr = json.loads(data.decode())self._servers.append(addr) def _get_server(self): ''' 随机选出一个可用的服务器 ''' return random.choice(self._servers) def get_connection(self): ''' 提供一个可用的tcp连接 ''' sock = None while True: server = self._get_server() print(’server:%s’ % server) try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.connect((server[’host’], server[’port’])) except ConnectionRefusedError:time.sleep(1)continue else:break return sockif __name__ == ’__main__’: # 模拟多个客户端批量生成任务,推送给服务器执行 client = ZKClient() for i in range(40): sock = client.get_connection() sock.send(bytes(str(i), encoding=’utf8’)) sock.close() time.sleep(1)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持好吧啦网。

标签: Python 编程
相关文章: