今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。

来安ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
- redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
 - implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
 
使用的方法:
| 1 2 | r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx() | 
有了ConnectionPool这个类之后,可以使用如下方法
- r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
 - r.xxxx()
 
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |    class  StrictRedis( object ):   ........        def  __init__( self , host = 'localhost' , port = 6379 ,                     db = 0 , password = None , socket_timeout = None ,                     socket_connect_timeout = None ,                     socket_keepalive = None , socket_keepalive_options = None ,                     connection_pool = None , unix_socket_path = None ,                     encoding = 'utf-8' , encoding_errors = 'strict' ,                     charset = None , errors = None ,                     decode_responses = False , retry_on_timeout = False ,                     ssl = False , ssl_keyfile = None , ssl_certfile = None ,                     ssl_cert_reqs = None , ssl_ca_certs = None ):             if  not  connection_pool:                 ..........                  connection_pool  =  ConnectionPool( * * kwargs)             self .connection_pool  =  connection_pool    | 
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |        # COMMAND EXECUTION AND PROTOCOL PARSING        def  execute_command( self ,  * args,  * * options):            "Execute a command and return a parsed response"            pool  =  self .connection_pool            command_name  =  args[ 0 ]            connection  =  pool.get_connection(command_name,  * * options)   #调用ConnectionPool.get_connection方法获取一个连接            try :                connection.send_command( * args)   #命令执行,这里为Connection.send_command                return  self .parse_response(connection, command_name,  * * options)            except  (ConnectionError, TimeoutError) as e:                connection.disconnect()                if  not  connection.retry_on_timeout  and  isinstance (e, TimeoutError):                    raise                connection.send_command( * args)                  return  self .parse_response(connection, command_name,  * * options)            finally :                pool.release(connection)   #调用ConnectionPool.release释放连接    | 
在来看看ConnectionPool类:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |          class  ConnectionPool( object ):             ...........        def  __init__( self , connection_class = Connection, max_connections = None ,                     * * connection_kwargs):    #类初始化时调用构造函数            max_connections  =  max_connections  or  2  * *  31            if  not  isinstance (max_connections, ( int ,  long ))  or  max_connections <  0 :   #判断输入的max_connections是否合法                raise  ValueError( '"max_connections" must be a positive integer' )            self .connection_class  =  connection_class   #设置对应的参数            self .connection_kwargs  =  connection_kwargs            self .max_connections  =  max_connections            self .reset()   #初始化ConnectionPool 时的reset操作        def  reset( self ):            self .pid  =  os.getpid()            self ._created_connections  =  0   #已经创建的连接的计数器            self ._available_connections  =  []    #声明一个空的数组,用来存放可用的连接            self ._in_use_connections  =  set ()   #声明一个空的集合,用来存放已经在用的连接            self ._check_lock  =  threading.Lock()   .......        def  get_connection( self , command_name,  * keys,  * * options):   #在连接池中获取连接的方法            "Get a connection from the pool"            self ._checkpid()            try :                connection  =  self ._available_connections.pop()   #获取并删除代表连接的元素,在***次获取connectiong时,因为_available_connections是一个空的数组,                会直接调用make_connection方法            except  IndexError:                connection  =  self .make_connection()            self ._in_use_connections.add(connection)    #向代表正在使用的连接的集合中添加元素            return  connection           def  make_connection( self ):  #在_available_connections数组为空时获取连接调用的方法            "Create a new connection"            if  self ._created_connections > =  self .max_connections:    #判断创建的连接是否已经达到***限制,max_connections可以通过参数初始化                raise  ConnectionError( "Too many connections" )            self ._created_connections  + =  1    #把代表已经创建的连接的数值+1            return  self .connection_class( * * self .connection_kwargs)      #返回有效的连接,默认为Connection(**self.connection_kwargs)        def  release( self , connection):   #释放连接,链接并没有断开,只是存在链接池中            "Releases the connection back to the pool"            self ._checkpid()            if  connection.pid ! =  self .pid:                return            self ._in_use_connections.remove(connection)    #从集合中删除元素            self ._available_connections.append(connection)  #并添加到_available_connections 的数组中        def  disconnect( self ):  #断开所有连接池中的链接            "Disconnects all connections in the pool"            all_conns  =  chain( self ._available_connections,                              self ._in_use_connections)            for  connection  in  all_conns:                connection.disconnect()    | 
execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
| 1 2 3 4 5 6 7 |    class  Connection( object ):        "Manages TCP communication to and from a Redis server"        def  __del__( self ):    #对象删除时的操作,调用disconnect释放连接            try :                self .disconnect()            except  Exception:                pass    | 
核心的链接建立方法是通过socket模块实现:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |         def  _connect( self ):            err  =  None            for  res  in  socket.getaddrinfo( self .host,  self .port,  0 ,                                          socket.SOCK_STREAM):                family, socktype, proto, canonname, socket_address  =  res                sock  =  None                try :                    sock  =  socket.socket(family, socktype, proto)                    # TCP_NODELAY                    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,  1 )                    # TCP_KEEPALIVE                    if  self .socket_keepalive:    #构造函数中默认 socket_keepalive=False,因此这里默认为短连接                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE,  1 )                        for  k, v  in  iteritems( self .socket_keepalive_options):                            sock.setsockopt(socket.SOL_TCP, k, v)                    # set the socket_connect_timeout before we connect                    sock.settimeout( self .socket_connect_timeout)   #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式                    # connect                    sock.connect(socket_address)                    # set the socket_timeout now that we're connected                    sock.settimeout( self .socket_timeout)   #构造函数中默认socket_timeout=None                    return  sock                except  socket.error as _:                    err  =  _                    if  sock  is  not  None :                        sock.close()   .....    | 
关闭链接的方法:
| 1 2 3 4 5 6 7 8 9 10 11 |         def  disconnect( self ):            "Disconnects from the Redis server"            self ._parser.on_disconnect()            if  self ._sock  is  None :                return            try :                self ._sock.shutdown(socket.SHUT_RDWR)   #先shutdown再close                self ._sock.close()            except  socket.error:                pass            self ._sock  =  None        | 
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。
本文出自 “菜光光的博客” 博客,请务必保留此出处http://caiguangguang.blog./1652935/1583541
                网站栏目:【博文推荐】Pythonredis链接建立实现分析
                
                标题链接:http://www.csdahua.cn/qtweb/news27/188877.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网