当前位置: 移动技术网 > IT编程>数据库>Mysql > Mysql数据库监听binlog的开启步骤

Mysql数据库监听binlog的开启步骤

2019年09月06日  | 移动技术网IT编程  | 我要评论

前言

我们经常需要根据用户对自己数据的一些操作来做一些事情.

比如如果用户删除了自己的账号,我们就给他发短信骂他,去发短信求他回来.

类似于这种功能,当然可以在业务逻辑层实现,在收到用户的删除请求之后执行这一操作,但是数据库的binlog为我们提供了另外一种操作方法.

要监听binlog,需要两步,第一步当然是你的mysql需要开启这一个功能,第二个是要写程序来对日志进行读取.

mysql开启binlog.

首先mysql的binlog日常是不打开的,因此我们需要:

找到mysql的配置文件my.cnf,这个因操作系统不一样,位置也不一定一样,可以自己找一下,

在其中加入以下内容:

[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = row

之后重启mysql.

/ ubuntu
service mysql restart
// mac
mysql.server restart

监测是否开启成功

进入mysql命令行,执行:

show variables like '%log_bin%' ;

如果结果如下图,则说明成功了:


查看正在写入的binlog状态:


代码读取binlog

引入依赖

我们使用开源的一些实现,这里因为一些奇怪的原因,我选用了mysql-binlog-connector-java这个包,(官方github仓库)[…]具体依赖如下:

<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
 <dependency>
 <groupid>com.github.shyiko</groupid>
 <artifactid>mysql-binlog-connector-java</artifactid>
 <version>0.17.0</version>
 </dependency>

当然,对binlog的处理有很多开源实现,阿里的cancl就是一个,也可以使用它.

写个demo

根据官方仓库中readme里面,来简单的写个demo.

 public static void main(string[] args) {
 binarylogclient client = new binarylogclient("hostname", 3306, "username", "passwd");
 eventdeserializer eventdeserializer = new eventdeserializer();
 eventdeserializer.setcompatibilitymode(
 eventdeserializer.compatibilitymode.date_and_time_as_long,
 eventdeserializer.compatibilitymode.char_and_binary_as_byte_array
 );
 client.seteventdeserializer(eventdeserializer);
 client.registereventlistener(new binarylogclient.eventlistener() {

 @override
 public void onevent(event event) {
 // todo
 dosomething();
 logger.info(event.tostring());
 }
 });
 client.connect();
 }

这个完全是根据官方教程里面写的,在onevent里面可以写自己的业务逻辑,由于我只是测试,所以我在里面将每一个event都打印了出来.

之后我手动登录到mysql,分别进行了增加,修改,删除操作,监听到的log如下:

00:23:13.331 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=0, eventtype=rotate, serverid=1, headerlength=19, datalength=28, nextposition=0, flags=32}, data=rotateeventdata{binlogfilename='mysql-bin.000001', binlogposition=886}}
00:23:13.334 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468403000, eventtype=format_description, serverid=1, headerlength=19, datalength=100, nextposition=0, flags=0}, data=formatdescriptioneventdata{binlogversion=4, serverversion='5.7.23-0ubuntu0.16.04.1-log', headerlength=19, datalength=95}}
00:23:23.715 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468603000, eventtype=anonymous_gtid, serverid=1, headerlength=19, datalength=46, nextposition=951, flags=0}, data=null}
00:23:23.716 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468603000, eventtype=query, serverid=1, headerlength=19, datalength=51, nextposition=1021, flags=8}, data=queryeventdata{threadid=4, executiontime=0, errorcode=0, database='pf', sql='begin'}}
00:23:23.721 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468603000, eventtype=table_map, serverid=1, headerlength=19, datalength=32, nextposition=1072, flags=0}, data=tablemapeventdata{tableid=108, database='pf', table='student', columntypes=15, 3, columnmetadata=135, 0, columnnullability={}}}
00:23:23.724 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468603000, eventtype=ext_write_rows, serverid=1, headerlength=19, datalength=23, nextposition=1114, flags=0}, data=writerowseventdata{tableid=108, includedcolumns={0, 1}, rows=[
    [[b@546a03af, 2]
]}}
00:23:23.725 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468603000, eventtype=xid, serverid=1, headerlength=19, datalength=12, nextposition=1145, flags=0}, data=xideventdata{xid=28}}
00:23:55.872 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468635000, eventtype=anonymous_gtid, serverid=1, headerlength=19, datalength=46, nextposition=1210, flags=0}, data=null}
00:23:55.872 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468635000, eventtype=query, serverid=1, headerlength=19, datalength=51, nextposition=1280, flags=8}, data=queryeventdata{threadid=4, executiontime=0, errorcode=0, database='pf', sql='begin'}}
00:23:55.873 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468635000, eventtype=table_map, serverid=1, headerlength=19, datalength=32, nextposition=1331, flags=0}, data=tablemapeventdata{tableid=108, database='pf', table='student', columntypes=15, 3, columnmetadata=135, 0, columnnullability={}}}
00:23:55.875 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468635000, eventtype=ext_update_rows, serverid=1, headerlength=19, datalength=31, nextposition=1381, flags=0}, data=updaterowseventdata{tableid=108, includedcolumnsbeforeupdate={0, 1}, includedcolumns={0, 1}, rows=[
    {before=[[b@6833ce2c, 1], after=[[b@725bef66, 3]}
]}}
00:23:55.875 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468635000, eventtype=xid, serverid=1, headerlength=19, datalength=12, nextposition=1412, flags=0}, data=xideventdata{xid=41}}
00:24:22.333 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468662000, eventtype=anonymous_gtid, serverid=1, headerlength=19, datalength=46, nextposition=1477, flags=0}, data=null}
00:24:22.334 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468662000, eventtype=query, serverid=1, headerlength=19, datalength=51, nextposition=1547, flags=8}, data=queryeventdata{threadid=4, executiontime=0, errorcode=0, database='pf', sql='begin'}}
00:24:22.334 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468662000, eventtype=table_map, serverid=1, headerlength=19, datalength=32, nextposition=1598, flags=0}, data=tablemapeventdata{tableid=108, database='pf', table='student', columntypes=15, 3, columnmetadata=135, 0, columnnullability={}}}
00:24:22.335 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468662000, eventtype=ext_delete_rows, serverid=1, headerlength=19, datalength=23, nextposition=1640, flags=0}, data=deleterowseventdata{tableid=108, includedcolumns={0, 1}, rows=[
    [[b@1888ff2c, 3]
]}}
00:24:22.335 [main] info util.mysqlbinlog - event{header=eventheaderv4{timestamp=1556468662000, eventtype=xid, serverid=1, headerlength=19, datalength=12, nextposition=1671, flags=0}, data=xideventdata{xid=42}}

根据自己的业务,封装一个更好使,更定制的工具类

开始的时候打算贴代码的,,,但是代码越写越多,索性传在github上了,这里只贴部分的实现.代码传送门

实现思路

  1. 支持对单个表的监听,因为我们不想真的对所有数据库中的所有数据表进行监听.
  2. 可以多线程消费.
  3. 把监听到的内容转换成我们喜闻乐见的形式(文中的数据结构不一定很好,我没想到更加合适的了).

所以实现思路大致如下:

  1. 封装个客户端,对外只提供获取方法,屏蔽掉初始化的细节代码.
  2. 提供注册监听器(伪)的方法,可以注册对某个表的监听(重新定义一个监听接口,所有注册的监听器实现这个就好).
  3. 真正的监听器只有客户端,他将此数据库实例上的所有操作,全部监听到并转换成我们想要的格式logitem放进阻塞队列里面.
  4. 启动多个线程,消费阻塞队列,对某一个logitem调用对应的数据表的监听器,做一些业务逻辑.

初始化代码:

 public mysqlbinloglistener(conf conf) {
 binarylogclient client = new binarylogclient(conf.host, conf.port, conf.username, conf.passwd);
 eventdeserializer eventdeserializer = new eventdeserializer();
 eventdeserializer.setcompatibilitymode(
 eventdeserializer.compatibilitymode.date_and_time_as_long,
 eventdeserializer.compatibilitymode.char_and_binary_as_byte_array
 );
 client.seteventdeserializer(eventdeserializer);
 this.parseclient = client;
 this.queue = new arrayblockingqueue<>(1024);
 this.conf = conf;
 listeners = new concurrenthashmap<>();
 dbtablecols = new concurrenthashmap<>();
 this.consumer = executors.newfixedthreadpool(consumerthreads);
 }

注册代码:

 public void reglistener(string db, string table, binloglistener listener) throws exception {
 string dbtable = getdbtable(db, table);
 class.forname("com.mysql.jdbc.driver");
 // 保存当前注册的表的colum信息
 connection connection = drivermanager.getconnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
 map<string, colum> cols = getcolmap(connection, db, table);
 dbtablecols.put(dbtable, cols);

 // 保存当前注册的listener
 list<binloglistener> list = listeners.getordefault(dbtable, new arraylist<>());
 list.add(listener);
 listeners.put(dbtable, list);
 }

在这个步骤中,我们在注册监听者的同时,获得了该表的schema信息,并保存到map里面去,方便后续对数据进行处理.

监听代码:

 @override
 public void onevent(event event) {
 eventtype eventtype = event.getheader().geteventtype();

 if (eventtype == eventtype.table_map) {
 tablemapeventdata tabledata = event.getdata();
 string db = tabledata.getdatabase();
 string table = tabledata.gettable();
 dbtable = getdbtable(db, table);
 }

 // 只处理添加删除更新三种操作
 if (iswrite(eventtype) || isupdate(eventtype) || isdelete(eventtype)) {
 if (iswrite(eventtype)) {
 writerowseventdata data = event.getdata();
 for (serializable[] row : data.getrows()) {
  if (dbtablecols.containskey(dbtable)) {
  logitem e = logitem.itemfrominsert(row, dbtablecols.get(dbtable));
  e.setdbtable(dbtable);
  queue.add(e);
  }
 }
 }
 }
 }

我偷懒了,,,这里面只实现了对添加操作的处理,其他操作没有写.

消费代码:

 public void parse() throws ioexception {
 parseclient.registereventlistener(this);

 for (int i = 0; i < consumerthreads; i++) {
 consumer.submit(() -> {
 while (true) {
  if (queue.size() > 0) {
  try {
  logitem item = queue.take();
  string dbtable = item.getdbtable();
  listeners.get(dbtable).foreach(l -> {
  l.onevent(item);
  });

  } catch (interruptedexception e) {
  e.printstacktrace();
  }
  }
  thread.sleep(1000);
 }
 });
 }
 parseclient.connect();
 }

消费时,从队列中获取item,之后获取对应的一个或者多个监听者,分别消费这个item.

测试代码:

 public static void main(string[] args) throws exception {
 conf conf = new conf();
 conf.host = "hostname";
 conf.port = 3306;
 conf.username = conf.passwd = "hhsgsb";

 mysqlbinloglistener mysqlbinloglistener = new mysqlbinloglistener(conf);
 mysqlbinloglistener.parseargsandrun(args);
 mysqlbinloglistener.reglistener("pf", "student", item -> {
 system.out.println(new string((byte[])item.getafter().get("name")));
 logger.info("insert into {}, value = {}", item.getdbtable(), item.getafter());
 });
 mysqlbinloglistener.reglistener("pf", "teacher", item -> system.out.println("teacher ===="));

 mysqlbinloglistener.parse();
 }

在这段很少的代码里,注册了两个监听者,分别监听student和teacher表,并分别进行打印处理,经测试,在teacher表插入数据时,可以独立的运行定义的业务逻辑.

注意:这里的工具类并不能直接投入使用,因为里面有许多的异常处理没有做,且功能仅监听了插入语句,可以用来做实现的参考.

参考文章

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对移动技术网的支持。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网