当前位置: 移动技术网 > IT编程>脚本编程>Python > python制作mysql数据迁移脚本

python制作mysql数据迁移脚本

2019年01月07日  | 移动技术网IT编程  | 我要评论

伤感美文欣赏,中华戏曲网,歌曲好运来串词

用python写了个数据迁移脚本,主要是利用从库将大的静态表导出表空间,载导入到目标实例中。

#!/usr/bin/env python3
#-*- coding:utf8 -*-
#author:zhanbin.liu
#!!!!!db必须同版本
#python3环境  pip3 install pymysql paramiko

import os
#from pathlib import path
import sys
import pymysql
import paramiko

#每次只能迁移一个db下的表,到指定db
#grant select, create, reload, alter, lock tables on *.* to 'data_migration'@'192.168.%' identified by 'data_migration@123';
tables='sqlauto_cluster,sqlauto_user'    #以,分割的字符串,如a,b,c
tablelist = tables.split(',')
sourceip = '192.168.1.101'
sourcedatabase = '/data/mysql/3306/data'
sourcedbname = 'inception_web'
sourcedatadir = os.path.join(sourcedatabase,sourcedbname)
desip = '192.168.1.102'
desdatabase = '/data/mysql/3306/data'
desdbname = 'inception_web'
desdatadir = os.path.join(desdatabase,desdbname)

# for table in tablelist:
#   desfile = path("%s/%s.ibd" %(desdatadir,table))
#   print(desfile)
#   if desfile.is_file():
#     print("ok")
#   else:
#     print("no")

comuser = 'data_migration'
compwd = 'data_migration@123'
comport = 3306

client = paramiko.sshclient()
client.set_missing_host_key_policy(paramiko.autoaddpolicy())

def table_judge():
  print("table_judge")
  sourcetableexist = pymysql.connect(sourceip,comuser,compwd,sourcedbname,comport,charset='utf8')
  destableexist = pymysql.connect(desip,comuser,compwd,desdbname,comport,charset='utf8')
  sourcetables = []
  destables = []
  cursor_source = sourcetableexist.cursor()
  cursor_des = destableexist.cursor()

  for table in tablelist:
    #print(table)
    cursor_source.execute("select table_name from information_schema.tables where table_schema='%s' and table_name='%s';" % (sourcedbname,table))
    sourcetable_tmp = cursor_source.fetchall()
    cursor_des.execute("select table_name from information_schema.tables where table_schema='%s' and table_name='%s';" % (desdbname,table))
    destable_tmp = cursor_des.fetchall()
    #print(destable_tmp)
    if sourcetable_tmp is ():
      sourcetables.append(table)
    if destable_tmp is not ():
      destables.append(destable_tmp[0][0])
  sourcetableexist.close()
  destableexist.close()

  s=d=0
  if sourcetables != []:
    print('迁移源不存在将要迁移的表:',sourceip,sourcedbname, sourcetables,' 请检查')
    s=1
  if destables != []:
    print('目标库存在将要迁移的表:',desip,desdbname,destables,' 请移除')
    d=1
  if s == 1 or d == 1:
    sys.exit()

def data_sync():
  print('data_sync')
  db_source = pymysql.connect(sourceip,comuser,compwd,sourcedbname,comport,charset='utf8')
  db_des = pymysql.connect(desip,comuser,compwd,desdbname,comport,charset='utf8')
  cursor_db_source = db_source.cursor()
  cursor_db_des = db_des.cursor()

  for table in tablelist:
    print("正在同步表:",table)
    cursor_db_source.execute("show create table %s;" % (table))
    createtablesql = cursor_db_source.fetchall()[0][1]
    print(createtablesql)
    try:
      cursor_db_des.execute(createtablesql)
    except exception as error:
      print(error)
    cursor_db_source.execute("flush table %s with read lock;" % (table))
    cursor_db_des.execute("alter table %s discard tablespace;" % (table))

    client.connect(sourceip, 22, 'root')
    stdin1, stdout1, stderr1 = client.exec_command("scp %s %s:%s " % (sourcedatadir+"/"+table+".ibd", desip, desdatadir))
    stdin2, stdout2, stderr2 = client.exec_command("scp %s %s:%s " % (sourcedatadir+"/"+table+".cfg", desip, desdatadir))
    a_e_1 = stderr1.readlines()
    a_e_2 = stderr2.readlines()
    if a_e_1 != [] or a_e_2 != []:
      print(a_e_1,a_e_2)
      sys.exit()
    client.close()

    client.connect(desip, 22, 'root')
    stdin3, stdout3, stderr3 = client.exec_command("chown -r mysql.mysql %s*" % (desdatadir+"/"+table))
    a_e_3 = stderr3.readlines()
    if a_e_3 != []:
      print(a_e_1, a_e_2)
      sys.exit()
    client.close()
    #cursor_db_source.execute("select sleep(10);")
    cursor_db_source.execute("unlock tables;")
    cursor_db_des.execute("alter table %s import tablespace;" % (table))
    print("同步完成")

  cursor_db_source.close()
  cursor_db_des.close()

def data_checksum():
  print('data_checksum')
  db_source = pymysql.connect(sourceip,comuser,compwd,sourcedbname,comport,charset='utf8')
  db_des = pymysql.connect(desip,comuser,compwd,desdbname,comport,charset='utf8')
  cursor_db_source = db_source.cursor()
  cursor_db_des = db_des.cursor()

  for table in tablelist:
    print("正在校验表:", table)
    cursor_db_source.execute("checksum table %s;" % (table))
    ck_s = cursor_db_source.fetchall()[0][1]
    cursor_db_des.execute("checksum table %s;" % (table))
    ck_d = cursor_db_des.fetchall()[0][1]
    if ck_s != ck_d:
      print("表不一致:",table)
    else:
      print("表一致:",table)

  cursor_db_source.close()
  cursor_db_des.close()

if __name__ == "__main__":
  table_judge()
  data_sync()
  data_checksum()
  print('haha')

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网