一文讀懂連接池技術(shù)原理、設(shè)計(jì)與實(shí)現(xiàn)

大數(shù)據(jù)

作者:曹金龍

概述

連接池的作用就是為了提高性能,將已經(jīng)創(chuàng)建好的連接保存在池中,當(dāng)有請(qǐng)求來(lái)時(shí),直接使用已經(jīng)創(chuàng)建好的連接對(duì)Server端進(jìn)行訪問(wèn)。這樣省略了創(chuàng)建連接和銷毀連接的過(guò)程(TCP連接建立時(shí)的三次握手和銷毀時(shí)的四次握手),從而在性能上得到了提高。

連接池設(shè)計(jì)的基本原理是這樣的:

建立連接池對(duì)象(服務(wù)啟動(dòng))。按照事先指定的參數(shù)創(chuàng)建初始數(shù)量的連接(即:空閑連接數(shù))。對(duì)于一個(gè)訪問(wèn)請(qǐng)求,直接從連接池中得到一個(gè)連接。如果連接池對(duì)象中沒(méi)有空閑的連接,且連接數(shù)沒(méi)有達(dá)到最大(即:最大活躍連接數(shù)),創(chuàng)建一個(gè)新的連接;如果達(dá)到最大,則設(shè)定一定的超時(shí)時(shí)間,來(lái)獲取連接。運(yùn)用連接訪問(wèn)服務(wù)。訪問(wèn)服務(wù)完成,釋放連接(此時(shí)的釋放連接,并非真正關(guān)閉,而是將其放入空閑隊(duì)列中。如實(shí)際空閑連接數(shù)大于初始空閑連接數(shù)則釋放連接)。釋放連接池對(duì)象(服務(wù)停止、維護(hù)期間,釋放連接池對(duì)象,并釋放所有連接)。

說(shuō)的通俗點(diǎn),可以把連接池理解為一個(gè)一個(gè)的管道,在管道空閑時(shí),便可以取出使用;同時(shí),也可以鋪設(shè)新的管道(當(dāng)然不能超過(guò)最大連接數(shù)的限制)。使用完之后,管道就變?yōu)榭臻e了。

通常比較常用的連接池是數(shù)據(jù)庫(kù)連接池,HTTP Client連接池,我也自己編寫(xiě)過(guò)連接池,如Thrift連接池及插入Rabbitmq隊(duì)列的連接池。

下面分析三個(gè)典型的連接池的設(shè)計(jì)。

數(shù)據(jù)庫(kù)連接池

首先剖析一下數(shù)據(jù)庫(kù)連接池的設(shè)計(jì)與實(shí)現(xiàn)的原理。DBUtils 屬于數(shù)據(jù)庫(kù)連接池實(shí)現(xiàn)模塊,用于連接DB-API 2模塊,對(duì)數(shù)據(jù)庫(kù)連接線程化,使可以安全和高效的訪問(wèn)數(shù)據(jù)庫(kù)的模塊。本文主要分析一下PooledDB的流程。

DBUtils.PooledDB使用DB-API 2模塊實(shí)現(xiàn)了一個(gè)強(qiáng)硬的、線程安全的、有緩存的、可復(fù)用的數(shù)據(jù)庫(kù)連接。

如下圖展示了使用PooledDB時(shí)的工作流程:

大數(shù)據(jù)

本文主要考慮dedicated connections,即專用數(shù)據(jù)庫(kù)連接,在初始化時(shí)連接池時(shí),就需要指定mincached、maxcached以及maxconnections等參數(shù),分別表示連接池的最小連接數(shù)、連接池的最大連接數(shù)以及系統(tǒng)可用的最大連接數(shù),同時(shí),blocking參數(shù)表征了當(dāng)獲取不到連接的時(shí)候是阻塞等待獲取連接還是返回異常:

if not blocking: ? ?def wait(): ? ? ? ?raise TooManyConnections ? ?self._condition.wait = wait

在連接池初始化時(shí),就會(huì)建立mincached個(gè)連接,代碼如下:

# Establish an initial number of idle database connections:idle = [self.dedicated_connection() for i in range(mincached)]while idle: ? ?idle.pop().close()

里面有close方法,看一下連接close方法的實(shí)現(xiàn):

def close(self): ? ?"""Close the pooled dedicated connection.""" ? ?# Instead of actually closing the connection, ? ?# return it to the pool for future reuse. ? ?if self._con: ? ? ? ?self._pool.cache(self._con) ? ? ? ?self._con = None

主要是實(shí)現(xiàn)了cache方法,看一下具體代碼:

def cache(self, con): ? ?"""Put a dedicated connection back into the idle cache.""" ? ?self._condition.acquire() ? ?try: ? ? ? ?if not self._maxcached or len(self._idle_cache) < self._maxcached: ? ? ? ? ? ?con._reset(force=self._reset) # rollback possible transaction ? ? ? ? ? ?# the idle cache is not full, so put it there ? ? ? ? ? ?self._idle_cache.append(con) # append it to the idle cache ? ? ? ?else: # if the idle cache is already full, ? ? ? ? ? ?con.close() # then close the connection ? ? ? ?self._connections -= 1 ? ? ? ?self._condition.notify() ? ?finally: ? ? ? ?self._condition.release()

由上述代碼可見(jiàn),close并不是把連接關(guān)閉,而是在連接池的數(shù)目小于maxcached的時(shí)候,將連接放回連接池,而大于此值時(shí),關(guān)閉該連接。同時(shí)可以注意到,在放回連接池之前,需要將事務(wù)進(jìn)行回滾,避免在使用連接池的時(shí)候有存活的事務(wù)沒(méi)有提交。這可以保證進(jìn)入連接池的連接都是可用的。

而獲取連接的過(guò)程正如之前討論的,先從連接池中獲取連接,如果獲取連接失敗,則新建立連接:

# try to get a dedicated connection ? ?self._condition.acquire() ? ?try: ? ? ? ?while (self._maxconnections ? ? ? ? ? ? ? ?and self._connections >= self._maxconnections): ? ? ? ? ? ?self._condition.wait() ? ? ? ?# connection limit not reached, get a dedicated connection ? ? ? ?try: # first try to get it from the idle cache ? ? ? ? ? ?con = self._idle_cache.pop(0) ? ? ? ?except IndexError: # else get a fresh connection ? ? ? ? ? ?con = self.steady_connection() ? ? ? ?else: ? ? ? ? ? ?con._ping_check() # check connection ? ? ? ?con = PooledDedicatedDBConnection(self, con) ? ? ? ?self._connections += 1 ? ?finally: ? ? ? ?self._condition.release()

關(guān)閉連接正如剛剛創(chuàng)建mincached個(gè)連接后關(guān)閉連接的流程,在連接池的數(shù)目小于maxcached的時(shí)候,將連接放回連接池,而大于此值時(shí),關(guān)閉該連接。

RabbitMQ隊(duì)列插入消息連接池

異步消息傳遞是高并發(fā)系統(tǒng)常用的一種技術(shù)手段。而這其中就少不了消息隊(duì)列。頻繁的向消息隊(duì)列里面插入消息,建立連接釋放連接會(huì)是比較大的開(kāi)銷。所以,可以使用連接池來(lái)提高系統(tǒng)性能。

連接池的設(shè)計(jì)實(shí)現(xiàn)如下:

大數(shù)據(jù)

在獲取連接的時(shí)候,先從隊(duì)列里面獲取連接,如果獲取不到,則新建立一個(gè)連接,如果不能新建立連接,則根據(jù)超時(shí)時(shí)間,阻塞等待從隊(duì)列里面獲取鏈接。如果沒(méi)成功,則做最后的嘗試,重新建立連接。代碼實(shí)現(xiàn)如下:

 ? ?def get_connection_pipe(self): ? ? ? ?""" ? ? ? ?獲取連接 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?try: ? ? ? ? ? ?connection_pipe = self._queue.get(False) ? ? ? ?except Queue.Empty: ? ? ? ? ? ?try: ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ?timeout = self.timeout ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?connection_pipe = self._queue.get(timeout=timeout) ? ? ? ? ? ? ? ?except Queue.Empty: ? ? ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ? ? ? ? ?logging.error("Too much connections, Get Connection Timeout!") ? ? ? ?if (time.time() - connection_pipe.use_time) > self.disable_time: ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ? ? ?return self.get_connection_pipe() ? ? ? ?return connection_pipe

一個(gè)RabbitMQ插入消息隊(duì)列的完整連接池設(shè)計(jì)如下:

# coding:utf-8import loggingimport threadingimport Queuefrom kombu import Connectionimport timeclass InsertQueue(): ? ?def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None, ? ? ? ? ? ? ? ? logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20): ? ? ? ?""" ? ? ? ?:param str host: Hostname or IP Address to connect to ? ? ? ?:param int port: TCP port to connect to ? ? ? ?:param str virtual_host: RabbitMQ virtual host to use ? ? ? ?:param int heartbeat_interval: ?How often to send heartbeats ? ? ? ?:param str name: auth credentials name ? ? ? ?:param str password: auth credentials password ? ? ? ?""" ? ? ? ?self.logger = logging if logger is None else logger ? ? ? ?self.host = host ? ? ? ?self.port = port ? ? ? ?self.virtual_host = virtual_host ? ? ? ?self.heartbeat_interval = heartbeat_interval ? ? ? ?self.name = name ? ? ? ?self.password = password ? ? ? ?self.mutex = threading.RLock() ? ? ? ?self.maxIdle = maxIdle ? ? ? ?self.maxActive = maxActive ? ? ? ?self.available = self.maxActive ? ? ? ?self.timeout = timeout ? ? ? ?self._queue = Queue.Queue(maxsize=self.maxIdle) ? ? ? ?self.disable_time = disable_time ? ?def get_new_connection_pipe(self): ? ? ? ?""" ? ? ? ?產(chǎn)生新的隊(duì)列連接 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.mutex: ? ? ? ? ? ?if self.available <= 0: ? ? ? ? ? ? ? ?raise GetConnectionException ? ? ? ? ? ?self.available -= 1 ? ? ? ?try: ? ? ? ? ? ?conn = Connection(hostname=self.host, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?port=self.port, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?virtual_host=self.virtual_host, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?heartbeat=self.heartbeat_interval, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?userid=self.name, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?password=self.password) ? ? ? ? ? ?producer = conn.Producer() ? ? ? ? ? ?return ConnectionPipe(conn, producer) ? ? ? ?except: ? ? ? ? ? ?with self.mutex: ? ? ? ? ? ? ? ?self.available += 1 ? ? ? ? ? ?raise GetConnectionException ? ?def get_connection_pipe(self): ? ? ? ?""" ? ? ? ?獲取連接 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?try: ? ? ? ? ? ?connection_pipe = self._queue.get(False) ? ? ? ?except Queue.Empty: ? ? ? ? ? ?try: ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ?timeout = self.timeout ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?connection_pipe = self._queue.get(timeout=timeout) ? ? ? ? ? ? ? ?except Queue.Empty: ? ? ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ? ? ?connection_pipe = self.get_new_connection_pipe() ? ? ? ? ? ? ? ? ? ?except GetConnectionException: ? ? ? ? ? ? ? ? ? ? ? ?logging.error("Too much connections, Get Connection Timeout!") ? ? ? ?if (time.time() - connection_pipe.use_time) > self.disable_time: ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ? ? ?return self.get_connection_pipe() ? ? ? ?return connection_pipe ? ?def close(self, connection_pipe): ? ? ? ?""" ? ? ? ?close the connection and the correlative channel ? ? ? ?:param connection_pipe: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.mutex: ? ? ? ? ? ?self.available += 1 ? ? ? ? ? ?connection_pipe.close() ? ? ? ?return ? ?def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True): ? ? ? ?""" ? ? ? ?insert message to queue ? ? ? ?:param str exchange: exchange name ? ? ? ?:param str body: message ? ? ? ?:param str routing_key: routing key ? ? ? ?:param bool mandatory: is confirm: True means confirm, False means not confirm ? ? ? ?:return: ? ? ? ?""" ? ? ? ?put_into_queue_flag = True ? ? ? ?insert_result = False ? ? ? ?connection_pipe = None ? ? ? ?try: ? ? ? ? ? ?connection_pipe = self.get_connection_pipe() ? ? ? ? ? ?producer = connection_pipe.channel ? ? ? ? ? ?use_time = time.time() ? ? ? ? ? ?producer.publish(exchange=exchange, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? body=body, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? delivery_mode=2, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? routing_key=routing_key, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? mandatory=mandatory ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ) ? ? ? ? ? ?insert_result = True ? ? ? ?except Exception: ? ? ? ? ? ?insert_result = False ? ? ? ? ? ?put_into_queue_flag = False ? ? ? ?finally: ? ? ? ? ? ?if put_into_queue_flag is True: ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?connection_pipe.use_time = use_time ? ? ? ? ? ? ? ? ? ?self._queue.put_nowait(connection_pipe) ? ? ? ? ? ? ? ?except Queue.Full: ? ? ? ? ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ? ? ?else: ? ? ? ? ? ? ? ?if connection_pipe is not None: ? ? ? ? ? ? ? ? ? ?self.close(connection_pipe) ? ? ? ?return insert_resultclass ConnectionPipe(object): ? ?""" ? ?connection和channel對(duì)象的封裝 ? ?""" ? ?def __init__(self, connection, channel): ? ? ? ?self.connection = connection ? ? ? ?self.channel = channel ? ? ? ?self.use_time = time.time() ? ?def close(self): ? ? ? ?try: ? ? ? ? ? ?self.connection.close() ? ? ? ?except Exception as ex: ? ? ? ? ? ?passclass GetConnectionException(): ? ?""" ? ?獲取連接異常 ? ?""" ? ?pass

Thrift連接池

Thrift是什么呢?簡(jiǎn)而言之,Thrift定義一個(gè)簡(jiǎn)單的文件,包含數(shù)據(jù)類型和服務(wù)接口,以作為輸入文件,編譯器生成代碼用來(lái)方便地生成RPC客戶端和服務(wù)器通信的方式。實(shí)際上就是一種遠(yuǎn)程調(diào)用的方式,因?yàn)閰f(xié)議棧為TCP層,所以相對(duì)于HTTP層效率會(huì)更高。

Thrift連接池的設(shè)計(jì)同數(shù)據(jù)庫(kù)連接池類似,流程圖如下:

大數(shù)據(jù)

思路依舊是,在獲取連接時(shí),先從連接池中獲取連接,若池中無(wú)連接,則判斷是否可以新建連接,若不能新建連接,則阻塞等待連接。

在從池中獲取不到隊(duì)列的時(shí)候的處理方式,本設(shè)計(jì)處理方式為:當(dāng)獲取不到連接時(shí),將這部分請(qǐng)求放入一個(gè)等待隊(duì)列,等待獲取連接;而當(dāng)關(guān)閉連接放回連接池時(shí),優(yōu)先判斷這個(gè)隊(duì)列是否有等待獲取連接的請(qǐng)求,若有,則優(yōu)先分配給這些請(qǐng)求。

獲取不到連接時(shí)處理代碼如下,將請(qǐng)求放入一個(gè)隊(duì)列進(jìn)行阻塞等待獲取連接:

async_result = AsyncResult()self.no_client_queue.appendleft(async_result)client = async_result.get() ?# blocking

而當(dāng)有連接釋放需要放回連接池時(shí),需要優(yōu)先考慮這部分請(qǐng)求,代碼如下:

def put_back_connections(self, client): ? ?""" ? ?線程安全 ? ?將連接放回連接池,邏輯如下: ? ?1、如果有請(qǐng)求尚未獲取到連接,請(qǐng)求優(yōu)先 ? ?2、如果連接池中的連接的數(shù)目小于maxIdle,則將該連接放回連接池 ? ?3、關(guān)閉連接 ? ?:param client: ? ?:return: ? ?""" ? ?with self.lock: ? ? ? ?if self.no_client_queue.__len__() > 0: ? ? ? ? ? ?task = self.no_client_queue.pop() ? ? ? ? ? ?task.set(client) ? ? ? ?elif self.connections.__len__() < self.maxIdle: ? ? ? ? ? ?self.connections.add(client) ? ? ? ?else: ? ? ? ? ? ?client.close() ? ? ? ? ? ?self.pool_size -= 1

最后,基于thrift連接池,介紹一個(gè)簡(jiǎn)單的服務(wù)化框架的實(shí)現(xiàn)。

服務(wù)化框架分為兩部分:RPC、注冊(cè)中心。

RPC:遠(yuǎn)程調(diào)用,遠(yuǎn)程調(diào)用的傳輸協(xié)議有很多種,可以走h(yuǎn)ttp、Webservice、TCP等。Thrift也是世界上主流的RPC框架。其重點(diǎn)在于安全、快速、最好能跨語(yǔ)言。注冊(cè)中心:用于存放,服務(wù)的IP地址和端口信息等。比較好的存放服務(wù)信息的方案有:Zookeeper、Redis等。其重點(diǎn)在于避免單點(diǎn)問(wèn)題,并且好維護(hù)。

通常的架構(gòu)圖為:

大數(shù)據(jù)

通過(guò)Thrift連接池作為客戶端,而Zookeeper作為注冊(cè)中心,設(shè)計(jì)服務(wù)框架。具體就是服務(wù)端在啟動(dòng)服務(wù)的時(shí)候到Zookeeper進(jìn)行注冊(cè),而客戶端在啟動(dòng)的時(shí)候通過(guò)Zookeeper發(fā)現(xiàn)服務(wù)端的IP和端口,通過(guò)Thrift連接池輪詢建立連接訪問(wèn)服務(wù)端的服務(wù)。

具體設(shè)計(jì)的代碼如下,代碼有點(diǎn)長(zhǎng),細(xì)細(xì)研讀一定有所收獲的:

# coding: utf-8import threadingfrom collections import dequeimport loggingimport socketimport timefrom kazoo.client import KazooClientfrom thriftpy.protocol import TBinaryProtocolFactoryfrom thriftpy.transport import ( ? ?TBufferedTransportFactory, ? ?TSocket,)from gevent.event import AsyncResultfrom gevent import Timeoutfrom error import CTECThriftClientErrorfrom thriftpy.thrift import TClientfrom thriftpy.transport import TTransportExceptionclass ClientPool: ? ?def __init__(self, service, server_hosts=None, zk_path=None, zk_hosts=None, logger=None, max_renew_times=3, maxActive=20, ? ? ? ? ? ? ? ? maxIdle=10, get_connection_timeout=30, socket_timeout=30, disable_time=3): ? ? ? ?""" ? ? ? ?:param service: Thrift的Service名稱 ? ? ? ?:param server_hosts: 服務(wù)提供者地址,數(shù)組類型,['ip:port','ip:port'] ? ? ? ?:param zk_path: 服務(wù)提供者在zookeeper中的路徑 ? ? ? ?:param zk_hosts: zookeeper的host地址,多個(gè)請(qǐng)用逗號(hào)隔開(kāi) ? ? ? ?:param max_renew_times: 最大重連次數(shù) ? ? ? ?:param maxActive: 最大連接數(shù) ? ? ? ?:param maxIdle: 最大空閑連接數(shù) ? ? ? ?:param get_connection_timeout:獲取連接的超時(shí)時(shí)間 ? ? ? ?:param socket_timeout: 讀取數(shù)據(jù)的超時(shí)時(shí)間 ? ? ? ?:param disable_time: 連接失效時(shí)間 ? ? ? ?""" ? ? ? ?# 負(fù)載均衡隊(duì)列 ? ? ? ?self.load_balance_queue = deque() ? ? ? ?self.service = service ? ? ? ?self.lock = threading.RLock() ? ? ? ?self.max_renew_times = max_renew_times ? ? ? ?self.maxActive = maxActive ? ? ? ?self.maxIdle = maxIdle ? ? ? ?self.connections = set() ? ? ? ?self.pool_size = 0 ? ? ? ?self.get_connection_timeout = get_connection_timeout ? ? ? ?self.no_client_queue = deque() ? ? ? ?self.socket_timeout = socket_timeout ? ? ? ?self.disable_time = disable_time ? ? ? ?self.logger = logging if logger is None else logger ? ? ? ?if zk_hosts: ? ? ? ? ? ?self.kazoo_client = KazooClient(hosts=zk_hosts) ? ? ? ? ? ?self.kazoo_client.start() ? ? ? ? ? ?self.zk_path = zk_path ? ? ? ? ? ?self.zk_hosts = zk_hosts ? ? ? ? ? ?# 定義Watcher ? ? ? ? ? ?self.kazoo_client.ChildrenWatch(path=self.zk_path, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?func=self.watcher) ? ? ? ? ? ?# 刷新連接池中的連接對(duì)象 ? ? ? ? ? ?self.__refresh_thrift_connections(self.kazoo_client.get_children(self.zk_path)) ? ? ? ?elif server_hosts: ? ? ? ? ? ?self.server_hosts = server_hosts ? ? ? ? ? ?# 復(fù)制新的IP地址到負(fù)載均衡隊(duì)列中 ? ? ? ? ? ?self.load_balance_queue.extendleft(self.server_hosts) ? ? ? ?else: ? ? ? ? ? ?raise CTECThriftClientError('沒(méi)有指定服務(wù)器獲取方式!') ? ?def get_new_client(self): ? ? ? ?""" ? ? ? ?輪詢?cè)诿總€(gè)ip:port的連接池中獲取連接(線程安全) ? ? ? ?從當(dāng)前隊(duì)列右側(cè)取出ip:port信息,獲取client ? ? ? ?將連接池對(duì)象放回到當(dāng)前隊(duì)列的左側(cè) ? ? ? ?請(qǐng)求或連接超時(shí)時(shí)間,默認(rèn)30秒 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?if self.pool_size < self.maxActive: ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?ip = self.load_balance_queue.pop() ? ? ? ? ? ? ? ?except IndexError: ? ? ? ? ? ? ? ? ? ?raise CTECThriftClientError('沒(méi)有可用的服務(wù)提供者列表!') ? ? ? ? ? ? ? ?if ip: ? ? ? ? ? ? ? ? ? ?self.load_balance_queue.appendleft(ip) ? ? ? ? ? ? ? ? ? ?# 創(chuàng)建新的thrift client ? ? ? ? ? ? ? ? ? ?t_socket = TSocket(ip.split(':')[0], int(ip.split(':')[1]), ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? socket_timeout=1000 * self.socket_timeout) ? ? ? ? ? ? ? ? ? ?proto_factory = TBinaryProtocolFactory() ? ? ? ? ? ? ? ? ? ?trans_factory = TBufferedTransportFactory() ? ? ? ? ? ? ? ? ? ?transport = trans_factory.get_transport(t_socket) ? ? ? ? ? ? ? ? ? ?protocol = proto_factory.get_protocol(transport) ? ? ? ? ? ? ? ? ? ?transport.open() ? ? ? ? ? ? ? ? ? ?client = TClient(self.service, protocol) ? ? ? ? ? ? ? ? ? ?self.pool_size += 1 ? ? ? ? ? ? ? ?return client ? ? ? ? ? ?else: ? ? ? ? ? ? ? ?return None ? ?def close(self): ? ? ? ?""" ? ? ? ?關(guān)閉所有連接池和zk客戶端 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?if getattr(self, 'kazoo_client', None): ? ? ? ? ? ?self.kazoo_client.stop() ? ?def watcher(self, children): ? ? ? ?""" ? ? ? ?zk的watcher方法,負(fù)責(zé)檢測(cè)zk的變化,刷新當(dāng)前雙端隊(duì)列中的連接池 ? ? ? ?:param children: 子節(jié)點(diǎn),即服務(wù)提供方的列表 ? ? ? ?:return: ? ? ? ?""" ? ? ? ?self.__refresh_thrift_connections(children) ? ?def __refresh_thrift_connections(self, children): ? ? ? ?""" ? ? ? ?刷新服務(wù)提供者在當(dāng)前隊(duì)列中的連接池信息(線程安全),主要用于zk刷新 ? ? ? ?:param children: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?# 清空負(fù)載均衡隊(duì)列 ? ? ? ? ? ?self.load_balance_queue.clear() ? ? ? ? ? ?# 清空連接池 ? ? ? ? ? ?self.connections.clear() ? ? ? ? ? ?# 復(fù)制新的IP地址到負(fù)載均衡隊(duì)列中 ? ? ? ? ? ?self.load_balance_queue.extendleft(children) ? ?def __getattr__(self, name): ? ? ? ?""" ? ? ? ?函數(shù)調(diào)用,最大重試次數(shù)為max_renew_times ? ? ? ?:param name: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?def method(*args, **kwds): ? ? ? ? ? ?# 從連接池獲取連接 ? ? ? ? ? ?client = self.get_client_from_pool() ? ? ? ? ? ?# 連接池中無(wú)連接 ? ? ? ? ? ?if client is None: ? ? ? ? ? ? ? ?# 設(shè)置獲取連接的超時(shí)時(shí)間 ? ? ? ? ? ? ? ?time_out = Timeout(self.get_connection_timeout) ? ? ? ? ? ? ? ?time_out.start() ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?async_result = AsyncResult() ? ? ? ? ? ? ? ? ? ?self.no_client_queue.appendleft(async_result) ? ? ? ? ? ? ? ? ? ?client = async_result.get() ?# blocking ? ? ? ? ? ? ? ?except: ? ? ? ? ? ? ? ? ? ?with self.lock: ? ? ? ? ? ? ? ? ? ? ? ?if client is None: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.no_client_queue.remove(async_result) ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Get Connection Timeout!") ? ? ? ? ? ? ? ?finally: ? ? ? ? ? ? ? ? ? ?time_out.cancel() ? ? ? ? ? ?if client is not None: ? ? ? ? ? ? ? ?for i in xrange(self.max_renew_times): ? ? ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = True ? ? ? ? ? ? ? ? ? ? ? ?client.last_use_time = time.time() ? ? ? ? ? ? ? ? ? ? ? ?fun = getattr(client, name, None) ? ? ? ? ? ? ? ? ? ? ? ?return fun(*args, **kwds) ? ? ? ? ? ? ? ? ? ?except socket.timeout: ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Socket Timeout!") ? ? ? ? ? ? ? ? ? ? ? ?# 關(guān)閉連接,不關(guān)閉會(huì)導(dǎo)致亂序 ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?except TTransportException, e: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?if e.type == TTransportException.END_OF_FILE: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.warning("Socket Connection Reset Error,%s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?with self.lock: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client.close() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.pool_size -= 1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client = self.get_new_client() ? ? ? ? ? ? ? ? ? ? ? ?else: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Socket Error,%s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?except socket.error, e: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?if e.errno == socket.errno.ECONNABORTED: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.warning("Socket Connection aborted Error,%s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?with self.lock: ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client.close() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.pool_size -= 1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?client = self.get_new_client() ? ? ? ? ? ? ? ? ? ? ? ?else: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Socket Error, %s", e) ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?except Exception as e: ? ? ? ? ? ? ? ? ? ? ? ?put_back_flag = False ? ? ? ? ? ? ? ? ? ? ? ?self.logger.error("Thrift Error, %s", e) ? ? ? ? ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ? ? ? ? ? ? ? ? ?break ? ? ? ? ? ? ? ? ? ?finally: ? ? ? ? ? ? ? ? ? ? ? ?# 將連接放回連接池 ? ? ? ? ? ? ? ? ? ? ? ?if put_back_flag is True: ? ? ? ? ? ? ? ? ? ? ? ? ? ?self.put_back_connections(client) ? ? ? ? ? ?return None ? ? ? ?return method ? ?def close_one_client(self, client): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?關(guān)閉連接 ? ? ? ?:param client: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?client.close() ? ? ? ? ? ?self.pool_size -= 1 ? ?def put_back_connections(self, client): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?將連接放回連接池,邏輯如下: ? ? ? ?1、如果有請(qǐng)求尚未獲取到連接,請(qǐng)求優(yōu)先 ? ? ? ?2、如果連接池中的連接的數(shù)目小于maxIdle,則將該連接放回連接池 ? ? ? ?3、關(guān)閉連接 ? ? ? ?:param client: ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?if self.no_client_queue.__len__() > 0: ? ? ? ? ? ? ? ?task = self.no_client_queue.pop() ? ? ? ? ? ? ? ?task.set(client) ? ? ? ? ? ?elif self.connections.__len__() < self.maxIdle: ? ? ? ? ? ? ? ?self.connections.add(client) ? ? ? ? ? ?else: ? ? ? ? ? ? ? ?client.close() ? ? ? ? ? ? ? ?self.pool_size -= 1 ? ?def get_client_from_pool(self): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?從連接池中獲取連接,若連接池中有連接,直接取出,否則, ? ? ? ?新建一個(gè)連接,若一直無(wú)法獲取連接,則返回None ? ? ? ?:return: ? ? ? ?""" ? ? ? ?client = self.get_one_client_from_pool() ? ? ? ?if client is not None and (time.time() - client.last_use_time) < self.disable_time: ? ? ? ? ? ?return client ? ? ? ?else: ? ? ? ? ? ?if client is not None: ? ? ? ? ? ? ? ?self.close_one_client(client) ? ? ? ?client = self.get_new_client() ? ? ? ?if client is not None: ? ? ? ? ? ?return client ? ? ? ?return None ? ?def get_one_client_from_pool(self): ? ? ? ?""" ? ? ? ?線程安全 ? ? ? ?從連接池中獲取一個(gè)連接,若取不到連接,則返回None ? ? ? ?:return: ? ? ? ?""" ? ? ? ?with self.lock: ? ? ? ? ? ?if self.connections: ? ? ? ? ? ? ? ?try: ? ? ? ? ? ? ? ? ? ?return self.connections.pop() ? ? ? ? ? ? ? ?except KeyError: ? ? ? ? ? ? ? ? ? ?return None ? ? ? ? ? ?return None

免責(zé)聲明:本網(wǎng)站內(nèi)容主要來(lái)自原創(chuàng)、合作伙伴供稿和第三方自媒體作者投稿,凡在本網(wǎng)站出現(xiàn)的信息,均僅供參考。本網(wǎng)站將盡力確保所提供信息的準(zhǔn)確性及可靠性,但不保證有關(guān)資料的準(zhǔn)確性及可靠性,讀者在使用前請(qǐng)進(jìn)一步核實(shí),并對(duì)任何自主決定的行為負(fù)責(zé)。本網(wǎng)站對(duì)有關(guān)資料所引致的錯(cuò)誤、不確或遺漏,概不負(fù)任何法律責(zé)任。任何單位或個(gè)人認(rèn)為本網(wǎng)站中的網(wǎng)頁(yè)或鏈接內(nèi)容可能涉嫌侵犯其知識(shí)產(chǎn)權(quán)或存在不實(shí)內(nèi)容時(shí),應(yīng)及時(shí)向本網(wǎng)站提出書(shū)面權(quán)利通知或不實(shí)情況說(shuō)明,并提供身份證明、權(quán)屬證明及詳細(xì)侵權(quán)或不實(shí)情況證明。本網(wǎng)站在收到上述法律文件后,將會(huì)依法盡快聯(lián)系相關(guān)文章源頭核實(shí),溝通刪除相關(guān)內(nèi)容或斷開(kāi)相關(guān)鏈接。

2017-11-22
一文讀懂連接池技術(shù)原理、設(shè)計(jì)與實(shí)現(xiàn)
作者:曹金龍 概述 連接池的作用就是為了提高性能,將已經(jīng)創(chuàng)建好的連接保存在池中,當(dāng)有請(qǐng)求來(lái)時(shí),直接使用已經(jīng)創(chuàng)建好的連接對(duì)Server端進(jìn)行訪問(wèn)。這樣省略了創(chuàng)建連

長(zhǎng)按掃碼 閱讀全文