Twitter的分布式自增ID算法snowflake (Java版)

概述

分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。

有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。

而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移到Cassandra,因为Cassandra没有顺序ID生成机制,所以开发了这样一套全局唯一ID生成服务。

 

结构

snowflake的结构如下(每部分用-分开):

0 – 0000000000 0000000000 0000000000 0000000000 0 – 00000 – 00000 – 000000000000

第一位为未使用,接下来的41位为毫秒级时间(41位的长度可以使用69年),然后是5位datacenterId和5位workerId(10位的长度最多支持部署1024个节点) ,最后12位是毫秒内的计数(12位的计数顺序号支持每个节点每毫秒产生4096个ID序号)

一共加起来刚好64位,为一个Long型。(转换成字符串后长度最多19)

snowflake生成的ID整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由datacenter和workerId作区分),并且效率较高。经测试snowflake每秒能够产生26万个ID。

源码

/**
 * Twitter_Snowflake<br>
 * SnowFlake的结构如下(每部分用-分开):<br>
 * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>
 * 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>
 * 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)
 * 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
 * 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>
 * 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>
 * 加起来刚好64位,为一个Long型。<br>
 * SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。
 */
public class SnowflakeIdWorker {

    // ==============================Fields===========================================
    /** 开始时间截 (2015-01-01) */
    private final long twepoch = 1420041600000L;

    /** 机器id所占的位数 */
    private final long workerIdBits = 5L;

    /** 数据标识id所占的位数 */
    private final long datacenterIdBits = 5L;

    /** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /** 支持的最大数据标识id,结果是31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /** 序列在id中占的位数 */
    private final long sequenceBits = 12L;

    /** 机器ID向左移12位 */
    private final long workerIdShift = sequenceBits;

    /** 数据标识id向左移17位(12+5) */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /** 时间截向左移22位(5+5+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** 工作机器ID(0~31) */
    private long workerId;

    /** 数据中心ID(0~31) */
    private long datacenterId;

    /** 毫秒内序列(0~4095) */
    private long sequence = 0L;

    /** 上次生成ID的时间截 */
    private long lastTimestamp = -1L;

    //==============================Constructors=====================================
    /**
     * 构造函数
     * @param workerId 工作ID (0~31)
     * @param datacenterId 数据中心ID (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    // ==============================Methods==========================================
    /**
     * 获得下一个ID (该方法是线程安全的)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        //如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            //毫秒内序列溢出
            if (sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        //上次生成ID的时间截
        lastTimestamp = timestamp;

        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒为单位的当前时间
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    //==============================Test=============================================
    /** 测试 */
    public static void main(String[] args) {
        SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
        for (int i = 0; i < 1000; i++) {
            long id = idWorker.nextId();
            System.out.println(Long.toBinaryString(id));
            System.out.println(id);
        }
    }
}

 

解密微信用户的unionid

String data = StringUtils.trimToEmpty(request.getParameter("data"));
String iv = StringUtils.trimToEmpty(request.getParameter("iv"));
String skey = StringUtils.trimToEmpty(request.getParameter("key"));

IvParameterSpec ivSpec = new IvParameterSpec(Base64.getDecoder().decode(iv));
SecretKeySpec keySpec = new SecretKeySpec(Base64.getDecoder().decode(skey), "AES");

Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING");
cipher.init(Cipher.DECRYPT_MODE, keySpec, ivSpec);
final byte[] orginal = cipher.doFinal(Base64.getDecoder().decode(data));
System.out.println(new String(orginal));

API设计需要注意的三点

登录验证:非对称加密;生成一对公私钥,将公用给客户进行数据加密,然后服务端用私钥解密。

数据防篡改:MD5摘要,对提交表单数据,进行MD5加密,并将加密后的值一起传递给服服务端,服务端用相同的算法进行md5密码验证

数据泄露:令牌机制 token,登录后服务气器返回一个token,以后的所有请求都必须携带该token。类似于auth2验证

搭建kafka集群

  1. 准备三台服务器,以下是我准备的服务器ip:172.16.11.196,172.16.11.197,172.16.11.198
  2. 从apache下载kafka的安装包,并解压
    curl -O http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
    tar -xzf kafka_2.11-1.0.0.tgz
    cd kafka_2.11-1.0.0.tgz
  3.  编辑kafka的配置文件server.properties
    vim config/server.properties
    #主要修改以下几个参数
    broker.id=1
    listeners=PLAINTEXT://172.16.11.196:9092
    log.dirs=/opt/kafka/data
    zookeeper.connect=172.16.11.196:2181,172.16.11.197:2181,172.16.11.198:2181
  4. 启动集群,确保已经启动zookeeper服务
    bin/kafka-server-start.sh -daemon config/server.properties 
    
  5. 创建topic
    #指定5个分区 2个副本
    bin/kafka-topics.sh --create --topic test --replication-factor 2 --partitions 5 --zookeeper 172.16.11.196:2181
    

     

布隆过滤器(BloomFilter)

Bloom Filter概念和原理

布隆过滤器(Bloom Filter)详解

 

Bloom filter 是由 Howard Bloom 在 1970 年提出的二进制向量数据结构,它具有很好的空间和时间效率,被用来检测一个元素是不是集合中的一个成员。如果检测结果为是,该元素不一定在集合中;但如果检测结果为否,该元素一定不在集合中。因此Bloom filter具有100%的召回率。这样每个检测请求返回有“在集合内(可能错误)”和“不在集合内(绝对不在集合内)”两种情况,可见 Bloom filter 是牺牲了正确率和时间以节省空间。

可用于解决缓存穿透问题

基于redis bitmap的Java实现

public class BloomFilter<E> {
    private static String CRAWLER_BLOOMFILTER = "bloomfilter";
    @Autowired
    private RedisTemplate<String, E> redisTemplate;

    @Value("${bloomfilter.expireDays}")
    private long expireDays;

    // total length of the Bloom filter
    private int sizeOfBloomFilter;
    // expected (maximum) number of elements to be added
    private int expectedNumberOfFilterElements;
    // number of hash functions
    private int numberOfHashFunctions;
    // encoding used for storing hash values as strings
    private final Charset charset = Charset.forName("UTF-8");
    // MD5 gives good enough accuracy in most circumstances. Change to SHA1 if it's needed
    private static final String hashName = "MD5";
    private static final MessageDigest digestFunction;

    // The digest method is reused between instances
    static {
        MessageDigest tmp;
        try {
            tmp = java.security.MessageDigest.getInstance(hashName);
        } catch (NoSuchAlgorithmException e) {
            tmp = null;
        }
        digestFunction = tmp;
    }

    public BloomFilter() {
        this(0.0001, 600000);
    }

    /**
     * Constructs an empty Bloom filter.
     * @param m
     *         is the total length of the Bloom filter.
     * @param n
     *         is the expected number of elements the filter will contain.
     * @param k
     *         is the number of hash functions used.
     */
    public BloomFilter(int m, int n, int k) {
        this.sizeOfBloomFilter = m;
        this.expectedNumberOfFilterElements = n;
        this.numberOfHashFunctions = k;
    }

    /**
     * Constructs an empty Bloom filter with a given false positive probability.
     * The size of bloom filter and the number of hash functions is estimated
     * to match the false positive probability.
     * @param falsePositiveProbability
     *         is the desired false positive probability.
     * @param expectedNumberOfElements
     *         is the expected number of elements in the Bloom filter.
     */
    public BloomFilter(double falsePositiveProbability, int expectedNumberOfElements) {
        this((int) Math.ceil((int) Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) * expectedNumberOfElements / Math.log(2)), // m = ceil(kn/ln2)
                expectedNumberOfElements,
                (int) Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2)))); // k = ceil(-ln(f)/ln2)
    }

    /**
     * Adds an object to the Bloom filter. The output from the object's
     * toString() method is used as input to the hash functions.
     * @param element
     *         is an element to register in the Bloom filter.
     */
    public void add(E element) {
        add(element.toString().getBytes(charset));
    }

    /**
     * Adds an array of bytes to the Bloom filter.
     * @param bytes
     *         array of bytes to add to the Bloom filter.
     */
    public void add(byte[] bytes) {
        if (redisTemplate.opsForValue().get(CRAWLER_BLOOMFILTER) == null) {
            redisTemplate.opsForValue().setBit(CRAWLER_BLOOMFILTER, 0, false);
            redisTemplate.expire(CRAWLER_BLOOMFILTER, expireDays, TimeUnit.DAYS);
        }

        int[] hashes = createHashes(bytes, numberOfHashFunctions);
        for (int hash : hashes) {
            redisTemplate.opsForValue().setBit(CRAWLER_BLOOMFILTER, Math.abs(hash % sizeOfBloomFilter), true);
        }
    }

    /**
     * Adds all elements from a Collection to the Bloom filter.
     * @param c
     *         Collection of elements.
     */
    public void addAll(Collection<? extends E> c) {
        for (E element : c) {
            add(element);
        }
    }

    /**
     * Returns true if the element could have been inserted into the Bloom filter.
     * Use getFalsePositiveProbability() to calculate the probability of this
     * being correct.
     * @param element
     *         element to check.
     * @return true if the element could have been inserted into the Bloom filter.
     */
    public boolean contains(E element) {
        return contains(element.toString().getBytes(charset));
    }

    /**
     * Returns true if the array of bytes could have been inserted into the Bloom filter.
     * Use getFalsePositiveProbability() to calculate the probability of this
     * being correct.
     * @param bytes
     *         array of bytes to check.
     * @return true if the array could have been inserted into the Bloom filter.
     */
    public boolean contains(byte[] bytes) {
        int[] hashes = createHashes(bytes, numberOfHashFunctions);
        for (int hash : hashes) {
            if (!redisTemplate.opsForValue().getBit(CRAWLER_BLOOMFILTER, Math.abs(hash % sizeOfBloomFilter))) {
                return false;
            }
        }
        return true;
    }

    /**
     * Returns true if all the elements of a Collection could have been inserted
     * into the Bloom filter. Use getFalsePositiveProbability() to calculate the
     * probability of this being correct.
     * @param c
     *         elements to check.
     * @return true if all the elements in c could have been inserted into the Bloom filter.
     */
    public boolean containsAll(Collection<? extends E> c) {
        for (E element : c) {
            if (!contains(element)) {
                return false;
            }
        }
        return true;
    }

    /**
     * Generates digests based on the contents of an array of bytes and splits the result into 4-byte int's and store them in an array. The
     * digest function is called until the required number of int's are produced. For each call to digest a salt
     * is prepended to the data. The salt is increased by 1 for each call.
     * @param data
     *         specifies input data.
     * @param hashes
     *         number of hashes/int's to produce.
     * @return array of int-sized hashes
     */
    public static int[] createHashes(byte[] data, int hashes) {
        int[] result = new int[hashes];

        int k = 0;
        byte salt = 0;
        while (k < hashes) {
            byte[] digest;
            synchronized (digestFunction) {
                digestFunction.update(salt);
                salt++;
                digest = digestFunction.digest(data);
            }

            for (int i = 0; i < digest.length / 4 && k < hashes; i++) {
                int h = 0;
                for (int j = (i * 4); j < (i * 4) + 4; j++) {
                    h <<= 8;
                    h |= ((int) digest[j]) & 0xFF;
                }
                result[k] = h;
                k++;
            }
        }
        return result;
    }

    public int getSizeOfBloomFilter() {
        return this.sizeOfBloomFilter;
    }

    public int getExpectedNumberOfElements() {
        return this.expectedNumberOfFilterElements;
    }

    public int getNumberOfHashFunctions() {
        return this.numberOfHashFunctions;
    }

    /**
     * Compares the contents of two instances to see if they are equal.
     * @param obj
     *         is the object to compare to.
     * @return True if the contents of the objects are equal.
     */
    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final BloomFilter<E> other = (BloomFilter<E>) obj;
        if (this.sizeOfBloomFilter != other.sizeOfBloomFilter) {
            return false;
        }
        if (this.expectedNumberOfFilterElements != other.expectedNumberOfFilterElements) {
            return false;
        }
        if (this.numberOfHashFunctions != other.numberOfHashFunctions) {
            return false;
        }
        return true;
    }

    /**
     * Calculates a hash code for this class.
     * @return hash code representing the contents of an instance of this class.
     */
    @Override
    public int hashCode() {
        int hash = 7;
        hash = 61 * hash + this.sizeOfBloomFilter;
        hash = 61 * hash + this.expectedNumberOfFilterElements;
        hash = 61 * hash + this.numberOfHashFunctions;
        return hash;
    }

    public static void main(String[] args) {
        BloomFilter<String> bloomFilter = new BloomFilter<>(0.0001, 600000);
        System.out.println(bloomFilter.getSizeOfBloomFilter());
        System.out.println(bloomFilter.getNumberOfHashFunctions());
    }
}

 

 

 

 

创建hadoop集群

  1. 准备三台机器,能相互访问,配置/etc/hosts
    192.168.0.1    master
    192.168.0.2    slave1
    192.168.0.3    slave2
  2. 安装,配置java环境
  3. 创建hadoop用户
    sudo addgroup hadoop
    sudo adduser --ingroup hadoop hduser
  4. 配置ssh
    su - hduser
    ssh-keygen -t rsa -P ""
    cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
    ssh localhost
  5. 下载hadoop的压缩包:http://hadoop.apache.org/releases.html
  6. 安装
    cd /usr/local
    sudo tar xzf hadoop-2.8.2.tar.gz
    sudo mv hadoop-2.8.2 hadoop
    sudo chown -R hduser:hadoop hadoop
  7. 更新$HOME/.bashrc,可能需要安装lzop(apt-get install lzop)
    export HADOOP_HOME=/usr/local/hadoop
    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
    unalias fs &> /dev/null
    alias fs="hadoop fs"
    unalias hls &> /dev/null
    alias hls="fs -ls"
    lzohead () {
        hadoop fs -cat $1 | lzop -dc | head -1000 | less
    }
    export PATH=$PATH:$HADOOP_HOME/bin
  8. 配置/usr/local/hadoop/etc/hadoop/hadoop-env.sh
    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
  9. 创建数据存放位置
    sudo mkdir -p /app/hadoop/tmp
    sudo chown hduser:hadoop /app/hadoop/tmp
    sudo chmod 750 /app/hadoop/tmp
  10. 配置/usr/local/hadoop/etc/hadoop/core-site.xml
    <property>
      <name>hadoop.tmp.dir</name>
      <value>/app/hadoop/tmp</value>
      <description>A base for other temporary directories.</description>
    </property>
    
    <property>
      <name>fs.default.name</name>
      <value>hdfs://master:54310</value>
      <description>The name of the default file system.  A URI whose
      scheme and authority determine the FileSystem implementation.  The
      uri's scheme determines the config property (fs.SCHEME.impl) naming
      the FileSystem implementation class.  The uri's authority is used to
      determine the host, port, etc. for a filesystem.</description>
    </property>
  11. 配置/usr/local/hadoop/etc/hadoop/mapred-site.xml
    <property>
      <name>mapred.job.tracker</name>
      <value>master:54311</value>
      <description>The host and port that the MapReduce job tracker runs
      at.  If "local", then jobs are run in-process as a single map
      and reduce task.
      </description>
    </property>
  12. 配置/usr/local/hadoop/etc/hadoop//hdfs-site.xml
    <property>
      <name>dfs.replication</name>
      <value>3</value>
      <description>Default block replication.
      The actual number of replications can be specified when the file is created.
      The default is used if replication is not specified in create time.
      </description>
    </property>
  13. 配置master与slave的连接(master only)
    ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@slave1
    ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@slave2
  14. 测试连接
    ssh master
    ssh slave1
    ssh slave2
    
  15. 增加masters文件写入master,(master only)
    /usr/local/hadoop/etc/hadoop/
    touch masters
  16. 修改slaves文件(master only)
    master
    slave1
    slave2
  17. 格式化文件系统(master only)
    hadoop namenode -format
  18. 启动
    cd /usr/local/hadoop/sbin
    ./start-dfs.sh
    ./start-mapred.sh
  19. web查看允许状态192.168.0.1:50070
    jps
  20. 停止
    ./stop-dfs.sh

参考:

部署单个节点

部署集群

一致性哈希Java实现

import java.util.Collection;
import java.util.SortedMap;
import java.util.TreeMap;

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;

public class ConsistentHash<T> {
    private final HashFunction hashFunction;
    private final int numberOfReplicas;
    private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();

    public ConsistentHash(HashFunction hashFunction, int numberOfReplicas, Collection<T> nodes) {
        this.hashFunction = hashFunction;
        this.numberOfReplicas = numberOfReplicas;

        for (T node : nodes) {
            add(node);
        }
    }

    private HashCode getHashCode(T node, int i) {
        return hashFunction.hashUnencodedChars(String.format("%s%s", node.toString(), i));
    }

    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.put(getHashCode(node, i).asInt(), node);
        }
    }

    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.remove(getHashCode(node, i).asInt());
        }
    }

    public T get(Object key) {
        if (circle.isEmpty()) {
            return null;
        }
        int hash = hashFunction.hashUnencodedChars(key.toString()).asInt();
        if (!circle.containsKey(hash)) {
            SortedMap<Integer, T> tailMap = circle.tailMap(hash);
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
        }
        return circle.get(hash);
    }

}

 

Redis安装以及集群配置

一、Redis安装

  1. 首先上官网下载Redis 压缩包,地址:http://redis.io/download 下载稳定版4.0.2即可。
  2. 下载、解压、编译,编译安装报错 error: jemalloc/jemalloc.h: No such file or directory解决方法:make MALLOC=libc
    wget http://download.redis.io/releases/redis-4.0.2.tar.gz
    tar xzf redis-4.0.2.tar.gz
    cd redis-4.0.2
    make
  3. 编译成功后,进入src文件夹,执行make install进行Redis安装。

二、Redis部署

  1. 为了方便管理,将Redis文件中的conf配置文件和常用命令移动到统一文件中
    mkdir -p /usr/local/redis/bin
    mv redis-server redis-sentinel redis-cli redis-benchmark redis-check-rdb redis-check-aof /usr/local/redis/bin/
  2. 将Redis的命令加入到PATH中
    vim ~/.bashrc 
    #在文件的末尾:
    export PATH=/usr/local/redis/bin:$PATH
    #保存后执行:
    source ~/.bashrc
  3. 直接执行redis-server 启动的Redis服务,执行完该命令后,如果Lunix关闭当前会话,则Redis服务也随即关闭。正常情况下,启动Redis服务需要从后台启动,并且指定启动配置文件。
  4. 复制redis.conf文件到目录中,并编辑redis.conf文件
    daemonize yes
    bind 172.16.11.2
    pidfile /opt/redis/redis4/redis_6379.pid
    logfile "redis.log"
    #修改作为slave的redis配置文件,指向master的ip和端口
    slaveof 172.16.11.1 6379
  5. 启动redis服务,执行:redis-server redis.conf
  6. 通过客户端连接,执行: redis-cli -h 172.16.11.1 -p 6379

三、高可用的哨兵方案配置

 

四、集群配置

  1. 修改redis.conf配置文件
    cluster-enabled yes
    cluster-config-file nodes-6379.conf
    cluster-node-timeout 5000
  2. 启动各个实例
    redis-server redis.conf
  3. 创建集群,在其中一台实例中执行
    apt-get install ruby
    gem install redis #版本不重要
    redis-trib.rb create --replicas 1 172.16.11.1:6379 172.16.11.1:6380 172.16.11.2:6379 172.16.11.2:6380 172.16.11.3:6379 172.16.11.3:6380
  4. 通过客户端链接
    redis-cli -h 172.16.11.1 -p 6379 -c
  5. Jedis客户端代码
    @Test
    public void testCluster() throws IOException {
        Set<HostAndPort> set = new HashSet<HostAndPort>();
        set.add(new HostAndPort("172.16.11.1", 6379));
        set.add(new HostAndPort("172.16.11.1", 6380));
        set.add(new HostAndPort("172.16.11.2", 6379));
        set.add(new HostAndPort("172.16.11.2", 6380));
        set.add(new HostAndPort("172.16.11.3", 6379));
        set.add(new HostAndPort("172.16.11.3", 6380));
        JedisCluster cluster = new JedisCluster(set);
        cluster.set("name","hello");
        System.out.println(cluster.get("name"));
        cluster.close();
    }

五、注意事项

  1. slave不提供写操作,只提供读取操作
  2. slave在于master同步时,会阻塞

docker运行mysql主从备份,读写分离

  1. 从store.docker.com获取mysql镜像
    docker pull mysql
  2. 添加mysql目录
    mkdir /opt/mysql
    mkdir /opt/mysql/data
    mkdir /opt/mysql/conf
  3. 设置主从配置,server-id分别设置为1、2
    vim /opt/mysql/conf/m.conf
    
    [mysqld]
    log-bin=mysql-bin
    server-id=1
  4. 在/opt/mysql文件夹下创建docker-compose.yml文件
    version: '2'
    
    services:
    
      mysql:
        image: mysql
        volumes:
            - /opt/mysql/data:/var/lib/mysql
            - /opt/mysql/conf:/etc/mysql/conf.d
        environment:
          MYSQL_ROOT_PASSWORD: 123456
        ports:
          - 3306:3306     
      
      adminer:
        image: adminer
        ports:
          - 8099:8080
  5. 启动服务
    docker-compose -f docker-compose.yml up -d
  6. 访问172.16.11.1:8099连接主库,创建一个用户用来同步数据
    GRANT REPLICATION SLAVE ON *.* to 'backup'@'%' identified by '123456';
  7. 查看主库状态,记住File和Position,如:mysql-bin.000004、312
    show master status;
  8. 连接到从库,设置主库链接
    change master to master_host='172.16.11.1',master_user='backup',master_password='123456',master_log_file='mysql-bin.000004',master_log_pos=312,master_port=3306;
  9. 启动同步
    start slave;
  10. 查看同步状态,如果看到Waiting for master to send event,就成功了,你现在在主库上的修改,都会同步到从库上
    show slave status

电子书下载网站

名称
网址
说明
脚本之家 http://www.jb51.net/books/ 主要是中文的扫描版PDF
CSDN http://download.csdn.net/
Fox ebook http://www.foxebook.net/ 文字版的英文图书,可以下载但比较麻烦
Avax https://avxhm.se/ebooks/programming_development 文字版的英文图书
Salttiger https://salttiger.com/ 文字版的英文图书,下载方便