作者:曹金龍
概述
連接池的作用就是為了提高性能,將已經(jīng)創(chuàng)建好的連接保存在池中,當有請求來時,直接使用已經(jīng)創(chuàng)建好的連接對Server端進行訪問。這樣省略了創(chuàng)建連接和銷毀連接的過程(TCP連接建立時的三次握手和銷毀時的四次握手),從而在性能上得到了提高。
連接池設計的基本原理是這樣的:
建立連接池對象(服務啟動)。按照事先指定的參數(shù)創(chuàng)建初始數(shù)量的連接(即:空閑連接數(shù))。對于一個訪問請求,直接從連接池中得到一個連接。如果連接池對象中沒有空閑的連接,且連接數(shù)沒有達到最大(即:最大活躍連接數(shù)),創(chuàng)建一個新的連接;如果達到最大,則設定一定的超時時間,來獲取連接。運用連接訪問服務。訪問服務完成,釋放連接(此時的釋放連接,并非真正關閉,而是將其放入空閑隊列中。如實際空閑連接數(shù)大于初始空閑連接數(shù)則釋放連接)。釋放連接池對象(服務停止、維護期間,釋放連接池對象,并釋放所有連接)。說的通俗點,可以把連接池理解為一個一個的管道,在管道空閑時,便可以取出使用;同時,也可以鋪設新的管道(當然不能超過最大連接數(shù)的限制)。使用完之后,管道就變?yōu)榭臻e了。
通常比較常用的連接池是數(shù)據(jù)庫連接池,HTTP Client連接池,我也自己編寫過連接池,如Thrift連接池及插入Rabbitmq隊列的連接池。
下面分析三個典型的連接池的設計。
數(shù)據(jù)庫連接池
首先剖析一下數(shù)據(jù)庫連接池的設計與實現(xiàn)的原理。DBUtils 屬于數(shù)據(jù)庫連接池實現(xiàn)模塊,用于連接DB-API 2模塊,對數(shù)據(jù)庫連接線程化,使可以安全和高效的訪問數(shù)據(jù)庫的模塊。本文主要分析一下PooledDB的流程。
DBUtils.PooledDB使用DB-API 2模塊實現(xiàn)了一個強硬的、線程安全的、有緩存的、可復用的數(shù)據(jù)庫連接。
如下圖展示了使用PooledDB時的工作流程:
本文主要考慮dedicated connections,即專用數(shù)據(jù)庫連接,在初始化時連接池時,就需要指定mincached、maxcached以及maxconnections等參數(shù),分別表示連接池的最小連接數(shù)、連接池的最大連接數(shù)以及系統(tǒng)可用的最大連接數(shù),同時,blocking參數(shù)表征了當獲取不到連接的時候是阻塞等待獲取連接還是返回異常:
if not blocking: ? ?def wait(): ? ? ? ?raise TooManyConnections ? ?self._condition.wait = wait
在連接池初始化時,就會建立mincached個連接,代碼如下:
# Establish an initial number of idle database connections:idle = [self.dedicated_connection() for i in range(mincached)]while idle: ? ?idle.pop().close()
里面有close方法,看一下連接close方法的實現(xiàn):
def close(self): ? ?"""Close the pooled dedicated connection.""" ? ?# Instead of actually closing the connection, ? ?# return it to the pool for future reuse. ? ?if self._con: ? ? ? ?self._pool.cache(self._con) ? ? ? ?self._con = None
主要是實現(xiàn)了cache方法,看一下具體代碼:
def cache(self, con): ? ?"""Put a dedicated connection back into the idle cache.""" ? ?self._condition.acquire() ? ?try: ? ? ? ?if not self._maxcached or len(self._idle_cache) < self._maxcached: ? ? ? ? ? ?con._reset(force=self._reset) # rollback possible transaction ? ? ? ? ? ?# the idle cache is not full, so put it there ? ? ? ? ? ?self._idle_cache.append(con) # append it to the idle cache ? ? ? ?else: # if the idle cache is already full, ? ? ? ? ? ?con.close() # then close the connection ? ? ? ?self._connections -= 1 ? ? ? ?self._condition.notify() ? ?finally: ? ? ? ?self._condition.release()
由上述代碼可見,close并不是把連接關閉,而是在連接池的數(shù)目小于maxcached的時候,將連接放回連接池,而大于此值時,關閉該連接。同時可以注意到,在放回連接池之前,需要將事務進行回滾,避免在使用連接池的時候有存活的事務沒有提交。這可以保證進入連接池的連接都是可用的。
而獲取連接的過程正如之前討論的,先從連接池中獲取連接,如果獲取連接失敗,則新建立連接:
# try to get a dedicated connection ? ?self._condition.acquire() ? ?try: ? ? ? ?while (self._maxconnections ? ? ? ? ? ? ? ?and self._connections >= self._maxconnections): ? ? ? ? ? ?self._condition.wait() ? ? ? ?# connection limit not reached, get a dedicated connection ? ? ? ?try: # first try to get it from the idle cache ? ? ? ? ? ?con = self._idle_cache.pop(0) ? ? ? ?except IndexError: # else get a fresh connection ? ? ? ? ? ?con = self.steady_connection() ? ? ? ?else: ? ? ? ? ? ?con._ping_check() # check connection ? ? ? ?con = PooledDedicatedDBConnection(self, con) ? ? ? ?self._connections += 1 ? ?finally: ? ? ? ?self._condition.release()
關閉連接正如剛剛創(chuàng)建mincached個連接后關閉連接的流程,在連接池的數(shù)目小于maxcached的時候,將連接放回連接池,而大于此值時,關閉該連接。
RabbitMQ隊列插入消息連接池
異步消息傳遞是高并發(fā)系統(tǒng)常用的一種技術手段。而這其中就少不了消息隊列。頻繁的向消息隊列里面插入消息,建立連接釋放連接會是比較大的開銷。所以,可以使用連接池來提高系統(tǒng)性能。
連接池的設計實現(xiàn)如下:
在獲取連接的時候,先從隊列里面獲取連接,如果獲取不到,則新建立一個連接,如果不能新建立連接,則根據(jù)超時時間,阻塞等待從隊列里面獲取鏈接。如果沒成功,則做最后的嘗試,重新建立連接。代碼實現(xiàn)如下:
? ?def get_connection_pipe(self): ? ? ? ?""" ? ? ? ?獲取連接 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?try: ? ? ? ? ? ?connection_pipe = self._queue.get(False) ? ? ? ?except Queue.Empty: ? ? ? ? ? ?try: ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ?timeout = self.timeout ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?connection_pipe = self._queue.get(timeout=timeout) ? ? ? ? ? ? ? ?except Queue.Empty: ? ? ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ? ? ? ? ?logging.error("Too much connections, Get Connection Timeout!") ? ? ? ?if (time.time() - connection_pipe.use_time) > self.disable_time: ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ? ? ?return self.get_connection_pipe() ? ? ? ?return connection_pipe
一個RabbitMQ插入消息隊列的完整連接池設計如下:
# coding:utf-8import loggingimport threadingimport Queuefrom kombu import Connectionimport timeclass InsertQueue(): ? ?def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None, ? ? ? ? ? ? ? ? logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20): ? ? ? ?""" ? ? ? ?:param str host: Hostname or IP Address to connect to ? ? ? ?:param int port: TCP port to connect to ? ? ? ?:param str virtual_host: RabbitMQ virtual host to use ? ? ? ?:param int heartbeat_interval: ?How often to send heartbeats ? ? ? ?:param str name: auth credentials name ? ? ? ?:param str password: auth credentials password ? ? ? ?""" ? ? ? ?self.logger = logging if logger is None else logger ? ? ? ?self.host = host ? ? ? ?self.port = port ? ? ? ?self.virtual_host = virtual_host ? ? ? ?self.heartbeat_interval = heartbeat_interval ? ? ? ?self.name = name ? ? ? ?self.password = password ? ? ? ?self.mutex = threading.RLock() ? ? ? ?self.maxIdle = maxIdle ? ? ? ?self.maxActive = maxActive ? ? ? ?self.available = self.maxActive ? ? ? ?self.timeout = timeout ? ? ? ?self._queue = Queue.Queue(maxsize=self.maxIdle) ? ? ? ?self.disable_time = disable_time ? ?def get_new_connection_pipe(self): ? ? ? ?""" ? ? ? ?產(chǎn)生新的隊列連接 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.mutex: ? ? ? ? ? ?if self.available <= 0: ? ? ? ? ? ? ? ?raise GetConnectionException ? ? ? ? ? ?self.available -= 1 ? ? ? ?try: ? ? ? ? ? ?conn = Connection(hostname=self.host, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?port=self.port, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?virtual_host=self.virtual_host, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?heartbeat=self.heartbeat_interval, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?userid=self.name, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?password=self.password) ? ? ? ? ? ?producer = conn.Producer() ? ? ? ? ? ?return ConnectionPipe(conn, producer) ? ? ? ?except: ? ? ? ? ? ?with self.mutex: ? ? ? ? ? ? ? ?self.available += 1 ? ? ? ? ? ?raise GetConnectionException ? ?def get_connection_pipe(self): ? ? ? ?""" ? ? ? ?獲取連接 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?try: ? ? ? ? ? ?connection_pipe = self._queue.get(False) ? ? ? ?except Queue.Empty: ? ? ? ? ? ?try: ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ?timeout = self.timeout ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?connection_pipe = self._queue.get(timeout=timeout) ? ? ? ? ? ? ? ?except Queue.Empty: ? ? ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ? ? ? ? ?logging.error("Too much connections, Get Connection Timeout!") ? ? ? ?if (time.time() - connection_pipe.use_time) > self.disable_time: ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ? ? ?return self.get_connection_pipe() ? ? ? ?return connection_pipe ? ?def close(self, connection_pipe): ? ? ? ?""" ? ? ? ?close the connection and the correlative channel ? ? ? ?:param connection_pipe: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.mutex: ? ? ? ? ? ?self.available += 1 ? ? ? ? ? ?connection_pipe.close() ? ? ? ?return ? ?def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True): ? ? ? ?""" ? ? ? ?insert message to queue ? ? ? ?:param str exchange: exchange name ? ? ? ?:param str body: message ? ? ? ?:param str routing_key: routing key ? ? ? ?:param bool mandatory: is confirm: True means confirm, False means not confirm ? ? ? ?:return: ? ? ? ?""" ? ? ? ?put_into_queue_flag = True ? ? ? ?insert_result = False ? ? ? ?connection_pipe = None ? ? ? ?try: ? ? ? ? ? ?connection_pipe = self.get_connection_pipe() ? ? ? ? ? ?producer = connection_pipe.channel ? ? ? ? ? ?use_time = time.time() ? ? ? ? ? ?producer.publish(exchange=exchange, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? body=body, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? delivery_mode=2, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? routing_key=routing_key, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? mandatory=mandatory ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ) ? ? ? ? ? ?insert_result = True ? ? ? ?except Exception: ? ? ? ? ? ?insert_result = False ? ? ? ? ? ?put_into_queue_flag = False ? ? ? ?finally: ? ? ? ? ? ?if put_into_queue_flag is True: ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?connection_pipe.use_time = use_time ? ? ? ? ? ? ? ? ? ?self._queue.put_nowait(connection_pipe) ? ? ? ? ? ? ? ?except Queue.Full: ? ? ? ? ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ? ? ?else: ? ? ? ? ? ? ? ?if connection_pipe is not None: ? ? ? ? ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ?return insert_resultclass ConnectionPipe(object): ? ?""" ? ?connection和channel對象的封裝 ? ?""" ? ?def __init__(self, connection, channel): ? ? ? ?self.connection = connection ? ? ? ?self.channel = channel ? ? ? ?self.use_time = time.time() ? ?def close(self): ? ? ? ?try: ? ? ? ? ? ?self.connection.close() ? ? ? ?except Exception as ex: ? ? ? ? ? ?passclass GetConnectionException(): ? ?""" ? ?獲取連接異常 ? ?""" ? ?pass
Thrift連接池
Thrift是什么呢?簡而言之,Thrift定義一個簡單的文件,包含數(shù)據(jù)類型和服務接口,以作為輸入文件,編譯器生成代碼用來方便地生成RPC客戶端和服務器通信的方式。實際上就是一種遠程調(diào)用的方式,因為協(xié)議棧為TCP層,所以相對于HTTP層效率會更高。
Thrift連接池的設計同數(shù)據(jù)庫連接池類似,流程圖如下:
思路依舊是,在獲取連接時,先從連接池中獲取連接,若池中無連接,則判斷是否可以新建連接,若不能新建連接,則阻塞等待連接。
在從池中獲取不到隊列的時候的處理方式,本設計處理方式為:當獲取不到連接時,將這部分請求放入一個等待隊列,等待獲取連接;而當關閉連接放回連接池時,優(yōu)先判斷這個隊列是否有等待獲取連接的請求,若有,則優(yōu)先分配給這些請求。
獲取不到連接時處理代碼如下,將請求放入一個隊列進行阻塞等待獲取連接:
async_result = AsyncResult()self.no_client_queue.appendleft(async_result)client = async_result.get() ?# blocking
而當有連接釋放需要放回連接池時,需要優(yōu)先考慮這部分請求,代碼如下:
def put_back_connections(self, client): ? ?""" ? ?線程安全 ? ?將連接放回連接池,邏輯如下: ? ?1、如果有請求尚未獲取到連接,請求優(yōu)先 ? ?2、如果連接池中的連接的數(shù)目小于maxIdle,則將該連接放回連接池 ? ?3、關閉連接 ? ?:param client: ? ?:return: ? ?""" ? ?with self.lock: ? ? ? ?if self.no_client_queue.__len__() > 0: ? ? ? ? ? ?task = self.no_client_queue.pop() ? ? ? ? ? ?task.set(client) ? ? ? ?elif self.connections.__len__() < self.maxIdle: ? ? ? ? ? ?self.connections.add(client) ? ? ? ?else: ? ? ? ? ? ?client.close() ? ? ? ? ? ?self.pool_size -= 1
最后,基于thrift連接池,介紹一個簡單的服務化框架的實現(xiàn)。
服務化框架分為兩部分:RPC、注冊中心。
RPC:遠程調(diào)用,遠程調(diào)用的傳輸協(xié)議有很多種,可以走http、Webservice、TCP等。Thrift也是世界上主流的RPC框架。其重點在于安全、快速、最好能跨語言。注冊中心:用于存放,服務的IP地址和端口信息等。比較好的存放服務信息的方案有:Zookeeper、Redis等。其重點在于避免單點問題,并且好維護。通常的架構圖為:
通過Thrift連接池作為客戶端,而Zookeeper作為注冊中心,設計服務框架。具體就是服務端在啟動服務的時候到Zookeeper進行注冊,而客戶端在啟動的時候通過Zookeeper發(fā)現(xiàn)服務端的IP和端口,通過Thrift連接池輪詢建立連接訪問服務端的服務。
具體設計的代碼如下,代碼有點長,細細研讀一定有所收獲的:
# coding: utf-8import threadingfrom collections import dequeimport loggingimport socketimport timefrom kazoo.client import KazooClientfrom thriftpy.protocol import TBinaryProtocolFactoryfrom thriftpy.transport import ( ? ?TBufferedTransportFactory, ? ?TSocket,)from gevent.event import AsyncResultfrom gevent import Timeoutfrom error import CTECThriftClientErrorfrom thriftpy.thrift import TClientfrom thriftpy.transport import TTransportExceptionclass ClientPool: ? ?def __init__(self, service, server_hosts=None, zk_path=None, zk_hosts=None, logger=None, max_renew_times=3, maxActive=20, ? ? ? ? ? ? ? ? maxIdle=10, get_connection_timeout=30, socket_timeout=30, disable_time=3): ? ? ? ?""" ? ? ? ?:param service: Thrift的Service名稱 ? ? ? ?:param server_hosts: 服務提供者地址,數(shù)組類型,['ip:port','ip:port'] ? ? ? ?:param zk_path: 服務提供者在zookeeper中的路徑 ? ? ? ?:param zk_hosts: zookeeper的host地址,多個請用逗號隔開 ? ? ? ?:param max_renew_times: 最大重連次數(shù) ? ? ? ?:param maxActive: 最大連接數(shù) ? ? ? ?:param maxIdle: 最大空閑連接數(shù) ? ? ? ?:param get_connection_timeout:獲取連接的超時時間 ? ? ? ?:param socket_timeout: 讀取數(shù)據(jù)的超時時間 ? ? ? ?:param disable_time: 連接失效時間 ? ? ? ?""" ? ? ? ?# 負載均衡隊列 ? ? ? ?self.load_balance_queue = deque() ? ? ? ?self.service = service ? ? ? ?self.lock = threading.RLock() ? ? ? ?self.max_renew_times = max_renew_times ? ? ? ?self.maxActive = maxActive ? ? ? ?self.maxIdle = maxIdle ? ? ? ?self.connections = set() ? ? ? ?self.pool_size = 0 ? ? ? ?self.get_connection_timeout = get_connection_timeout ? ? ? ?self.no_client_queue = deque() ? ? ? ?self.socket_timeout = socket_timeout ? ? ? ?self.disable_time = disable_time ? ? ? ?self.logger = logging if logger is None else logger ? ? ? ?if zk_hosts: ? ? ? ? ? ?self.kazoo_client = KazooClient(hosts=zk_hosts) ? ? ? ? ? ?self.kazoo_client.start() ? ? ? ? ? ?self.zk_path = zk_path ? ? ? ? ? ?self.zk_hosts = zk_hosts ? ? ? ? ? ?# 定義Watcher ? ? ? ? ? ?self.kazoo_client.ChildrenWatch(path=self.zk_path, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?func=self.watcher) ? ? ? ? ? ?# 刷新連接池中的連接對象 ? ? ? ? ? ?self.__refresh_thrift_connections(self.kazoo_client.get_children(self.zk_path)) ? ? ? ?elif server_hosts: ? ? ? ? ? ?self.server_hosts = server_hosts ? ? ? ? ? ?# 復制新的IP地址到負載均衡隊列中 ? ? ? ? ? ?self.load_balance_queue.extendleft(self.server_hosts) ? ? ? ?else: ? ? ? ? ? ?raise CTECThriftClientError('沒有指定服務器獲取方式!') ? ?def get_new_client(self): ? ? ? ?""" ? ? ? ?輪詢在每個ip:port的連接池中獲取連接(線程安全) ? ? ? ?從當前隊列右側取出ip:port信息,獲取client ? ? ? ?將連接池對象放回到當前隊列的左側 ? ? ? ?請求或連接超時時間,默認30秒 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?if self.pool_size < self.maxActive: ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?ip = self.load_balance_queue.pop() ? ? ? ? ? ? ? ?except IndexError: ? ? ? ? ? ? ? ? ? ?raise CTECThriftClientError('沒有可用的服務提供者列表!') ? ? ? ? ? ? ? ?if ip: ? ? ? ? ? ? ? ? ? ?self.load_balance_queue.appendleft(ip) ? ? ? ? ? ? ? ? ? ?# 創(chuàng)建新的thrift client ? ? ? ? ? ? ? ? ? ?t_socket = TSocket(ip.split(':')[0], int(ip.split(':')[1]), ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? socket_timeout=1000 * self.socket_timeout) ? ? ? ? ? ? ? ? ? ?proto_factory = TBinaryProtocolFactory() ? ? ? ? ? ? ? ? ? ?trans_factory = TBufferedTransportFactory() ? ? ? ? ? ? ? ? ? ?transport = trans_factory.get_transport(t_socket) ? ? ? ? ? ? ? ? ? ?protocol = proto_factory.get_protocol(transport) ? ? ? ? ? ? ? ? ? ?transport.open() ? ? ? ? ? ? ? ? ? ?client = TClient(self.service, protocol) ? ? ? ? ? ? ? ? ? ?self.pool_size += 1 ? ? ? ? ? ? ? ?return client ? ? ? ? ? ?else: ? ? ? ? ? ? ? ?return None ? ?def close(self): ? ? ? ?""" ? ? ? ?關閉所有連接池和zk客戶端 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?if getattr(self, 'kazoo_client', None): ? ? ? ? ? ?self.kazoo_client.stop() ? ?def watcher(self, children): ? ? ? ?""" ? ? ? ?zk的watcher方法,負責檢測zk的變化,刷新當前雙端隊列中的連接池 ? ? ? ?:param children: 子節(jié)點,即服務提供方的列表 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?self.__refresh_thrift_connections(children) ? ?def __refresh_thrift_connections(self, children): ? ? ? ?""" ? ? ? ?刷新服務提供者在當前隊列中的連接池信息(線程安全),主要用于zk刷新 ? ? ? ?:param children: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?# 清空負載均衡隊列 ? ? ? ? ? ?self.load_balance_queue.clear() ? ? ? ? ? ?# 清空連接池 ? ? ? ? ? ?self.connections.clear() ? ? ? ? ? ?# 復制新的IP地址到負載均衡隊列中 ? ? ? ? ? ?self.load_balance_queue.extendleft(children) ? ?def __getattr__(self, name): ? ? ? ?""" ? ? ? ?函數(shù)調(diào)用,最大重試次數(shù)為max_renew_times ? ? ? ?:param name: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?def method(*args, **kwds): ? ? ? ? ? ?# 從連接池獲取連接 ? ? ? ? ? ?client = self.get_client_from_pool() ? ? ? ? ? ?# 連接池中無連接 ? ? ? ? ? ?if client is None: ? ? ? ? ? ? ? ?# 設置獲取連接的超時時間 ? ? ? ? ? ? ? ?time_out = Timeout(self.get_connection_timeout) ? ? ? ? ? ? ? ?time_out.start() ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?async_result = AsyncResult() ? ? ? ? ? ? ? ? ? ?self.no_client_queue.appendleft(async_result) ? ? ? ? ? ? ? ? ? ?client = async_result.get() ?# blocking ? ? ? ? ? ? ? ?except: ? ? ? ? ? ? ? ? ? ?with self.lock: ? ? ? ? ? ? ? ? ? ? ? ?if client is None: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.no_client_queue.remove(async_result) ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Get Connection Timeout!") ? ? ? ? ? ? ? ?finally: ? ? ? ? ? ? ? ? ? ?time_out.cancel() ? ? ? ? ? ?if client is not None: ? ? ? ? ? ? ? ?for i in xrange(self.max_renew_times): ? ? ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = True ? ? ? ? ? ? ? ? ? ? ? ?client.last_use_time = time.time() ? ? ? ? ? ? ? ? ? ? ? ?fun = getattr(client, name, None) ? ? ? ? ? ? ? ? ? ? ? ?return fun(*args, **kwds) ? ? ? ? ? ? ? ? ? ?except socket.timeout: ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Socket Timeout!") ? ? ? ? ? ? ? ? ? ? ? ?# 關閉連接,不關閉會導致亂序 ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?except TTransportException, e: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?if e.type == TTransportException.END_OF_FILE: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.warning("Socket Connection Reset Error,%s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?with self.lock: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client.close() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.pool_size -= 1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client = self.get_new_client() ? ? ? ? ? ? ? ? ? ? ? ?else: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Socket Error,%s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?except socket.error, e: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?if e.errno == socket.errno.ECONNABORTED: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.warning("Socket Connection aborted Error,%s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?with self.lock: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client.close() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.pool_size -= 1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client = self.get_new_client() ? ? ? ? ? ? ? ? ? ? ? ?else: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Socket Error, %s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?except Exception as e: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Thrift Error, %s", e) ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?finally: ? ? ? ? ? ? ? ? ? ? ? ?# 將連接放回連接池 ? ? ? ? ? ? ? ? ? ? ? ?if put_back_flag is True: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.put_back_connections(client) ? ? ? ? ? ?return None ? ? ? ?return method ? ?def close_one_client(self, client): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?關閉連接 ? ? ? ?:param client: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?client.close() ? ? ? ? ? ?self.pool_size -= 1 ? ?def put_back_connections(self, client): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?將連接放回連接池,邏輯如下: ? ? ? ?1、如果有請求尚未獲取到連接,請求優(yōu)先 ? ? ? ?2、如果連接池中的連接的數(shù)目小于maxIdle,則將該連接放回連接池 ? ? ? ?3、關閉連接 ? ? ? ?:param client: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?if self.no_client_queue.__len__() > 0: ? ? ? ? ? ? ? ?task = self.no_client_queue.pop() ? ? ? ? ? ? ? ?task.set(client) ? ? ? ? ? ?elif self.connections.__len__() < self.maxIdle: ? ? ? ? ? ? ? ?self.connections.add(client) ? ? ? ? ? ?else: ? ? ? ? ? ? ? ?client.close() ? ? ? ? ? ? ? ?self.pool_size -= 1 ? ?def get_client_from_pool(self): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?從連接池中獲取連接,若連接池中有連接,直接取出,否則, ? ? ? ?新建一個連接,若一直無法獲取連接,則返回None ? ? ? ?:return: ? ? ? ?""" ? ? ? ?client = self.get_one_client_from_pool() ? ? ? ?if client is not None and (time.time() - client.last_use_time) < self.disable_time: ? ? ? ? ? ?return client ? ? ? ?else: ? ? ? ? ? ?if client is not None: ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ?client = self.get_new_client() ? ? ? ?if client is not None: ? ? ? ? ? ?return client ? ? ? ?return None ? ?def get_one_client_from_pool(self): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?從連接池中獲取一個連接,若取不到連接,則返回None ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?if self.connections: ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?return self.connections.pop() ? ? ? ? ? ? ? ?except KeyError: ? ? ? ? ? ? ? ? ? ?return None ? ? ? ? ? ?return None
- 蜜度索驥:以跨模態(tài)檢索技術助力“企宣”向上生長
- Commvault持續(xù)業(yè)務策略:應對現(xiàn)代數(shù)據(jù)保護挑戰(zhàn)的新范式
- 2025年網(wǎng)絡安全主要趨勢
- 2025年值得關注的數(shù)據(jù)中心可持續(xù)發(fā)展趨勢
- 量子計算火熱,投資者又在大舉尋找“量子概念股”
- 從量子威脅到人工智能防御:2025年網(wǎng)絡安全將如何發(fā)展
- 后人工智能時代:2025年,在紛擾中重塑數(shù)據(jù)、洞察和行動
- 2025年展望:人工智能推動IT整合
- 量子計算:商業(yè)世界的新前沿與設計思維的融合
- IDC:三季度全球以太網(wǎng)交換機收入同比下降7.9%、環(huán)比增長6.6%
- Fortinet李宏凱:2025年在中國大陸啟動SASE PoP節(jié)點部署 助力企業(yè)出海
免責聲明:本網(wǎng)站內(nèi)容主要來自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準確性及可靠性,但不保證有關資料的準確性及可靠性,讀者在使用前請進一步核實,并對任何自主決定的行為負責。本網(wǎng)站對有關資料所引致的錯誤、不確或遺漏,概不負任何法律責任。任何單位或個人認為本網(wǎng)站中的網(wǎng)頁或鏈接內(nèi)容可能涉嫌侵犯其知識產(chǎn)權或存在不實內(nèi)容時,應及時向本網(wǎng)站提出書面權利通知或不實情況說明,并提供身份證明、權屬證明及詳細侵權或不實情況證明。本網(wǎng)站在收到上述法律文件后,將會依法盡快聯(lián)系相關文章源頭核實,溝通刪除相關內(nèi)容或斷開相關鏈接。