当前位置: 移动技术网 > IT编程>数据库>Mysql > 使用canal将mysql数据同步到Redis的操作方法

使用canal将mysql数据同步到Redis的操作方法

2018年11月01日  | 移动技术网IT编程  | 我要评论

写这篇博文时,自己一定是含着误删的眼泪写完的,文中的后续部分会谈到这个“从删库到**”的悲惨故事,这个故事深刻地教训了我,我也想以此来警示大家,注意数据安全和数据备份。

1. 可行方案

回归正题:我们的标题为《使用canal将mysql数据同步到redis的操作方法》,那就先来说说我们的目的:mysql数据同步到redis,想达到读写分离,redis只做缓存,mysql做持久化。刚开始想这样干的时候就去网上收集资料,发现了n多做法:

先从redis读取数据,如果没有查询到;便从mysql查询数据,将查询到的内容放到redis中。对于写操作,先对mysql进行写,写成功对redis进行写。当然这是一种相对直观而且简单的方法,但是看起来有许多操作需要我们自己去做。

使用mysql的udf去做,大体的思想是通过数据库中的trigger调用自定义的函数库来触发对redis的相应操作,比较麻烦的一点是:自定义的函数库需要我们基于mysql的api进行开发(c++),想想自己的java程序要去调用这么一堆玩意,本人很不情愿。据了解,该方法也是阿里早起的解决方案,具体的步骤可参照:《【菜鸟玩linux开发】通过mysql自动同步刷新redis》

通过gearman去同步,但是通过了解发现,它一般使用在php的开发中。

接下来的两种方案都属于对mysql中的binlog进行解析的方法了。

使用open-replicator解析binlog,https://github.com/whitesock/open-replicator.

使用canal进行同步,当然是能够解放双手的工具。

通过大量的资料收集和调查,我使用了canal进行了mysql数据同步到redis。先简单谈谈canal:

canal主要是基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,核心基本就是模拟mysql中slave节点请求。具体的原理在这里不进行介绍,可以移步《阿里巴巴开源项目: canal 基于mybinlog的增量订阅&消费》 进行学习。

2. mysql的配置

开启mysql的binlog模块

切换到mysql的安装路径,找到my.cnf(linux)/my.ini (windows),加入如下内容:

[mysqld]
log-bin=mysql-bin #添加这一行就ok
binlog-format=row #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveid重复

配置完成后,需要重启数据库。当重启数据库遇到问题时,耐心解决,但需要警告的是,千万别动data文件夹下的文件。当然如果你觉得你比较有“资本”,同时遇到了“mysql 1067 无法启动”的错误,你可以试着备份一下data文件夹下的内容,删除logfile文件,重启数据库即可,但本人极不推荐这样进行操作。就是由于本人之前的无知,根据一个无良博客,误删了ibdata1文件,使得本人造成了很大的损失,mysql下的所有数据库瞬间毁灭。

配置mysql数据库

创建canal用户,用来管理canal的访问权限。我们可以通过对canal用户访问权限的控制,进而控制canal能够获取的内容。

create user canal identified by 'canal';    
grant select, replication slave, replication client on 数据库名.表名 to 'canal'@'%';  
-- grant all privileges on 数据库名.表名 to 'canal'@'%' ;  
flush privileges;  

3. canal配置与部署

下载部署包

下载,解压,我使用的是最新版本1.0.22

配置canal

主要配置的文件有两处,canal/conf/example/instance.properties 和 canal/conf/canal.properties . 而canal.properties 文件我们一般保持默认配置,所以我们仅对instance.properties 进行修改。如果需要对canal进行复杂的配置,可以参考《canal adminguide》。

## mysql serverid
canal.instance.mysql.slaveid = 1234

# position info
canal.instance.master.address = ***.***.***.***:3306 #改成自己的数据库地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbusername = canal #改成自己的数据库信息 
canal.instance.dbpassword = canal #改成自己的数据库信息 
canal.instance.defaultdatabasename =  #改成自己的数据库信息
canal.instance.connectioncharset = utf-8 #改成自己的数据库信息 

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex = 
启动canal
./canal/startup.sh
查看启动状态

我们可以通过查看logs/canal/canal.log 和logs/example/example.log日志来判断canal是否启动成功。

canal/logs/canal/canal.log

2016-12-29 14:03:00.956 [main] info  com.alibaba.otter.canal.deployer.canallauncher - ## start the canal server.
2016-12-29 14:03:01.071 [main] info  com.alibaba.otter.canal.deployer.canalcontroller - ## start the canal server[192.168.1.99:11111]
2016-12-29 14:03:01.628 [main] info  com.alibaba.otter.canal.deployer.canallauncher - ## the canal server is running now ......

canal/logs/example/example.log

2016-12-29 14:03:01.357 [main] info  c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [canal.properties]
2016-12-29 14:03:01.362 [main] info  c.a.o.c.i.spring.support.propertyplaceholderconfigurer - loading properties file from class path resource [example/instance.properties]
2016-12-29 14:03:01.535 [main] info  c.a.otter.canal.instance.spring.canalinstancewithspring - start cannalinstance for 1-example 
2016-12-29 14:03:01.555 [main] info  c.a.otter.canal.instance.core.abstractcanalinstance - start successful....

4. java连接canal执行同步操作

在maven项目中中加载canal和redis依赖包.

    
    redis.clients    
    jedis    
    2.4.2    
 
    
    com.alibaba.otter    
    canal.client    
    1.0.22    
   

建立canal客户端,从canal中获取数据,并将数据更新至redis.

import java.net.inetsocketaddress;  
import java.util.list;  

import com.alibaba.fastjson.jsonobject;
import com.alibaba.otter.canal.client.canalconnector;  
import com.alibaba.otter.canal.common.utils.addressutils;  
import com.alibaba.otter.canal.protocol.message;  
import com.alibaba.otter.canal.protocol.canalentry.column;  
import com.alibaba.otter.canal.protocol.canalentry.entry;  
import com.alibaba.otter.canal.protocol.canalentry.entrytype;  
import com.alibaba.otter.canal.protocol.canalentry.eventtype;  
import com.alibaba.otter.canal.protocol.canalentry.rowchange;  
import com.alibaba.otter.canal.protocol.canalentry.rowdata;  
import com.alibaba.otter.canal.client.*;  

public class canalclient{  
   public static void main(string args[]) {  
       canalconnector connector = canalconnectors.newsingleconnector(new inetsocketaddress(addressutils.gethostip(),  
               11111), "example", "", "");  
       int batchsize = 1000;  
       try {  
           connector.connect();  
           connector.subscribe(".*\\..*");  
           connector.rollback();    
           while (true) {  
               message message = connector.getwithoutack(batchsize); // 获取指定数量的数据  
               long batchid = message.getid();  
               int size = message.getentries().size();  
               if (batchid == -1 || size == 0) {  
                   try {  
                       thread.sleep(1000);  
                   } catch (interruptedexception e) {  
                       e.printstacktrace();  
                   }  
               } else {  
                   printentry(message.getentries());  
               }  
               connector.ack(batchid); // 提交确认  
               // connector.rollback(batchid); // 处理失败, 回滚数据  
           }  
       } finally {  
           connector.disconnect();  
       }  
   }  

   private static void printentry( list entrys) {  
       for (entry entry : entrys) {  
           if (entry.getentrytype() == entrytype.transactionbegin || entry.getentrytype() == entrytype.transactionend) {  
               continue;  
           }  
           rowchange rowchage = null;  
           try {  
               rowchage = rowchange.parsefrom(entry.getstorevalue());  
           } catch (exception e) {  
               throw new runtimeexception("error ## parser of eromanga-event has an error , data:" + entry.tostring(),  
                       e);  
           }  
           eventtype eventtype = rowchage.geteventtype();  
           system.out.println(string.format("================> binlog[%s:%s] , name[%s,%s] , eventtype : %s",  
                   entry.getheader().getlogfilename(), entry.getheader().getlogfileoffset(),  
                   entry.getheader().getschemaname(), entry.getheader().gettablename(),  
                   eventtype));  

           for (rowdata rowdata : rowchage.getrowdataslist()) {  
               if (eventtype == eventtype.delete) {  
                   redisdelete(rowdata.getbeforecolumnslist());  
               } else if (eventtype == eventtype.insert) {  
                   redisinsert(rowdata.getaftercolumnslist());  
               } else {  
                   system.out.println("-------> before");  
                   printcolumn(rowdata.getbeforecolumnslist());  
                   system.out.println("-------> after");  
                   redisupdate(rowdata.getaftercolumnslist());  
               }  
           }  
       }  
   }  

   private static void printcolumn( list columns) {  
       for (column column : columns) {  
           system.out.println(column.getname() + " : " + column.getvalue() + "    update=" + column.getupdated());  
       }  
   }  

  private static void redisinsert( list columns){
      jsonobject json=new jsonobject();
      for (column column : columns) {  
          json.put(column.getname(), column.getvalue());  
       }  
      if(columns.size()>0){
          redisutil.stringset("user:"+ columns.get(0).getvalue(),json.tojsonstring());
      }
   }

  private static  void redisupdate( list columns){
      jsonobject json=new jsonobject();
      for (column column : columns) {  
          json.put(column.getname(), column.getvalue());  
       }  
      if(columns.size()>0){
          redisutil.stringset("user:"+ columns.get(0).getvalue(),json.tojsonstring());
      }
  }

   private static  void redisdelete( list columns){
       jsonobject json=new jsonobject();
          for (column column : columns) {  
              json.put(column.getname(), column.getvalue());  
           }  
          if(columns.size()>0){
              redisutil.delkey("user:"+ columns.get(0).getvalue());
          }
   }   
}  

redisutil 工具类

import redis.clients.jedis.jedis;
import redis.clients.jedis.jedispool;
import redis.clients.jedis.jedispoolconfig;

public class redisutil {

    // redis服务器ip
    private static string addr = "0.0.0.0";

    // redis的端口号
    private static int port = 6379;

    // 访问密码
    //private static string auth = "admin";

    // 可用连接实例的最大数目,默认值为8;
    // 如果赋值为-1,则表示不限制;如果pool已经分配了maxactive个jedis实例,则此时pool的状态为exhausted(耗尽)。
    private static int max_active = 1024;

    // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
    private static int max_idle = 200;

    // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出jedisconnectionexception;
    private static int max_wait = 10000;

    // 过期时间
    protected static int  expiretime = 60 * 60 *24;

    // 连接池
    protected static jedispool pool;

    /**
     * 静态代码,只在初次调用一次
     */
    static {
        jedispoolconfig config = new jedispoolconfig();
        //最大连接数
        config.setmaxtotal(max_active);
        //最多空闲实例
        config.setmaxidle(max_idle);
        //超时时间
        config.setmaxwaitmillis(max_wait);
        //
        config.settestonborrow(false);
        pool = new jedispool(config, addr, port, 1000);
    }

    /**
     * 获取jedis实例
     */
    protected static synchronized jedis getjedis() {
        jedis jedis = null;
        try {
            jedis = pool.getresource();
        } catch (exception e) {
            e.printstacktrace();
            if (jedis != null) {
                pool.returnbrokenresource(jedis);
            }
        }
        return jedis;
    }

    /**
     * 释放jedis资源
     * @param jedis
     * @param isbroken
     */
    protected static void closeresource(jedis jedis, boolean isbroken) {
        try {
            if (isbroken) {
                pool.returnbrokenresource(jedis);
            } else {
                pool.returnresource(jedis);
            }
        } catch (exception e) {

        }
    }

    /**
     * 是否存在key
     * @param key
     */
    public static boolean existkey(string key) {
        jedis jedis = null;
        boolean isbroken = false;
        try {
            jedis = getjedis();
            jedis.select(0);
            return jedis.exists(key);
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
        return false;
    }

    /**
     * 删除key
     * @param key
     */
    public static void delkey(string key) {
        jedis jedis = null;
        boolean isbroken = false;
        try {
            jedis = getjedis();
            jedis.select(0);
            jedis.del(key);
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
    }

    /**
     * 取得key的值
     * @param key
     */
    public static string stringget(string key) {
        jedis jedis = null;
        boolean isbroken = false;
        string lastval = null;
        try {
            jedis = getjedis();
            jedis.select(0);
            lastval = jedis.get(key);
            jedis.expire(key, expiretime);
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
        return lastval;
    }

    /**
     * 添加string数据
     * @param key
     * @param value
     */
    public static string stringset(string key, string value) {
        jedis jedis = null;
        boolean isbroken = false;
        string lastval = null;
        try {
            jedis = getjedis();
            jedis.select(0);
            lastval = jedis.set(key, value);
            jedis.expire(key, expiretime);
        } catch (exception e) {
            e.printstacktrace();
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
        return lastval;
    }

    /**
     *  添加hash数据
     * @param key
     * @param field
     * @param value
     */
    public static void hashset(string key, string field, string value) {
        boolean isbroken = false;
        jedis jedis = null;
        try {
            jedis = getjedis();
            if (jedis != null) {
                jedis.select(0);
                jedis.hset(key, field, value);
                jedis.expire(key, expiretime);
            }
        } catch (exception e) {
            isbroken = true;
        } finally {
            closeresource(jedis, isbroken);
        }
    }
}

至此,我们利用canal进行了mysql数据同步到redis的任务,可以根据不同的需求将代码进行修改置于需要的位置。

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网