memcached 客户端代码 Java memcached client学习(一致性hash)
昨天写了一篇短文 描述了淘宝面试的一些题。
今天下午装了在ubuntu中装了memcached,装起来还是很简单的。
主要是装一下。
memcached: wget http://memcached.googlecode.com/files/memcached-1.4.4.tar.gz /data/ libevent:wget http://monkey.org/~provos/libevent-1.4.13-stable.tar.gz /
自己分配了一个端口。那么我想还是从最简单的memcached--
javamemcachedclient源码,总的代码量还是很少的
主要是如下两个类:
MemcachedClient.java
SockIOPool.java
好先看推荐的测试代码:
/** * Copyright (c) 2008 Greg Whalin * All rights reserved. * * This library is free software; you can redistribute it and/or * modify it under the terms of the BSD license * * This library is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE. * * You should have received a copy of the BSD License along with this * library. * * @author greg whalin <[email protected]> */ package com.meetup.memcached.test; import com.meetup.memcached.*; import org.apache.log4j.*; public class TestMemcached { public static void main(String[] args) { // memcached should be running on port 11211 but NOT on 11212 BasicConfigurator.configure(); String[] servers = { "localhost:11211"}; SockIOPool pool = SockIOPool.getInstance(); pool.setServers( servers ); pool.setFailover( true ); pool.setInitConn( 10 ); pool.setMinConn( 5 ); pool.setMaxConn( 250 ); pool.setMaintSleep( 30 ); //这是开启一个nagle 算法。改算法避免网络中充塞小封包,提高网络的利用率 pool.setNagle( false ); pool.setSocketTO( 3000 ); pool.setAliveCheck( true ); pool.initialize(); MemcachedClient mcc = new MemcachedClient(); // turn off most memcached client logging: com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN ); for ( int i = 0; i < 10; i++ ) { boolean success = mcc.set( "" + i, "Hello!" ); String result = (String)mcc.get( "" + i ); System.out.println( String.format( "set( %d ): %s", i, success ) ); System.out.println( String.format( "get( %d ): %s", i, result ) ); } }
其实对于我来说我很想明白的是连接池是如何配置的,关键在于pool.initialize();这个方法如何初始化的。
/** * Initializes the pool. */ public void initialize() { synchronized( this ) { // check to see if already initialized if ( initialized && ( buckets != null || consistentBuckets != null ) && ( availPool != null ) && ( busyPool != null ) ) { log.error( "++++ trying to initialize an already initialized pool" ); return; } // pools availPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn ); busyPool = new HashMap<String,Map<SockIO,Long>>( servers.length * initConn ); deadPool = new IdentityHashMap<SockIO,Integer>(); hostDeadDur = new HashMap<String,Long>(); hostDead = new HashMap<String,Date>(); maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier; // only create up to maxCreate connections at once if ( log.isDebugEnabled() ) { log.debug( "++++ initializing pool with following settings:" ); log.debug( "++++ initial size: " + initConn ); log.debug( "++++ min spare : " + minConn ); log.debug( "++++ max spare : " + maxConn ); } // if servers is not set, or it empty, then // throw a runtime exception if ( servers == null || servers.length <= 0 ) { log.error( "++++ trying to initialize with no servers" ); throw new IllegalStateException( "++++ trying to initialize with no servers" ); } // initalize our internal hashing structures if ( this.hashingAlg == CONSISTENT_HASH ) populateConsistentBuckets(); else populateBuckets(); // mark pool as initialized this.initialized = true; // start maint thread if ( this.maintSleep > 0 ) this.startMaintThread(); } }
如上代码流程如下:
1检测是否已经被初始化
2定义可用链接,繁忙链接池
3判断是否一致性hash算法还是普通的算法
4定义一个后台线程,来维护
好,首先来分析下一致性hash算法。
从如下代码来分析:
if ( this.hashingAlg == CONSISTENT_HASH ) populateConsistentBuckets();
/** 将server添加到一致性hash的2的32次 圆环 **/ private void populateConsistentBuckets() { if ( log.isDebugEnabled() ) log.debug( "++++ initializing internal hashing structure for consistent hashing" ); // store buckets in tree map this.consistentBuckets = new TreeMap<Long,String>(); MessageDigest md5 = MD5.get(); //得到总的权重 if ( this.totalWeight <= 0 && this.weights != null ) { for ( int i = 0; i < this.weights.length; i++ ) this.totalWeight += ( this.weights[i] == null ) ? 1 : this.weights[i]; } else if ( this.weights == null ) { this.totalWeight = this.servers.length; } for ( int i = 0; i < servers.length; i++ ) { //每台服务器的权重 int thisWeight = 1; if ( this.weights != null && this.weights[i] != null ) { thisWeight = this.weights[i]; } //有兴趣的朋友可以参考平衡Hash 算法的另一个指标是平衡性 (Balance) ,定义如下: 平衡性 平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用 //了解决这种情况, consistent hashing 引入了“虚拟节点”的概念,它可以如下定义: “虚拟节点”( virtual node )是实际节点在 hash 空间的复制品( replica ),一实际个节点对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以 hash 值排列。 double factor = Math.floor(((double)(40 * this.servers.length * thisWeight)) / (double)this.totalWeight); for ( long j = 0; j < factor; j++ ) { //加密规则类似 127.0.0.1_1 byte[] d = md5.digest( ( servers[i] + "-" + j ).getBytes() ); //转化成16位的字节数组 //16位二进制数组每4位为一组,每组第4个值左移24位,第三个值左移16位,第二个值左移8位,第一个值不移位。进行或运算,得到一个小于2的32 次方的long值 for ( int h = 0 ; h < 4; h++ ) { //因为是16位 Long k = //实际上每个字节进行了运算 ((long)(d[3+h*4]&0xFF) << 24) | ((long)(d[2+h*4]&0xFF) << 16) | ((long)(d[1+h*4]&0xFF) << 8) | ((long)(d[0+h*4]&0xFF)); consistentBuckets.put( k, servers[i] ); if ( log.isDebugEnabled() ) log.debug( "++++ added " + servers[i] + " to server bucket" ); } } // create initial connections if ( log.isDebugEnabled() ) log.debug( "+++ creating initial connections (" + initConn + ") for host: " + servers[i] ); //创建链接 for ( int j = 0; j < initConn; j++ ) { SockIO socket = createSocket( servers[i] ); if ( socket == null ) { log.error( "++++ failed to create connection to: " + servers[i] + " -- only " + j + " created." ); break; } //加入socket到连接池 这里慢慢谈 addSocketToPool( availPool, servers[i], socket ); if ( log.isDebugEnabled() ) log.debug( "++++ created and added socket: " + socket.toString() + " for host " + servers[i] ); } } }
好比如说我们调用了如下代码:
MemcachedClient mcc = new MemcachedClient(); mcc.set("6", 1);
这里key如何定位到一台server呢?我先把一致性hash算法的定位方法说下。
//得到定位server的Socket封装对象 SockIOPool.SockIO sock = pool.getSock( key, hashCode );
//计算出key对应的hash值(md5) ,然后 long bucket = getBucket( key, hashCode );
//得到大于hash的map,因为treemap已经排好序了。调用tailMap可以得到大于等于这个hash的对象 ,然后调用firstKey得到圆环上的hash值 SortedMap<Long,String> tmap = this.consistentBuckets.tailMap( hv ); return ( tmap.isEmpty() ) ? this.consistentBuckets.firstKey() : tmap.firstKey();
明天开始继续看代码,还是要坚持。
一致性hash的算法参考
http://xok.la/2010/06/memcache_consistent_hashing.html
相关推荐
郗瑞强 2020-08-16
85590296 2020-07-22
jkzyx 2020-06-29
luotuo 2020-06-26
LinuxJob 2020-06-26
ol0 2020-06-26
清溪算法君老号 2020-06-25
86251043 2020-06-13
CSDN0BLOG 2020-06-09
ol0 2020-05-26
andyhuabing 2020-05-22
程序员俱乐部 2020-05-06
83530391 2020-05-05
ol0 2020-05-02
83530391 2020-04-09
85590296 2020-03-25
carolAnn 2020-03-07
大脸猫脸大 2020-03-03
ol0 2020-02-18