mirror of
https://github.com/MariaDB/server.git
synced 2025-01-19 13:32:33 +01:00
Import myrocks_hotbackup from the upstream
- They have it in scripts/myrocks_hotbackup, so we don't get it during merges. this will be fixed - OTOH the testsuite is already being merged into storage/rocksdb/mysql-test/rocksdb (but isn't run due to using many upstream-only features)
This commit is contained in:
parent
7fc25cfbca
commit
e402d779c7
1 changed files with 686 additions and 0 deletions
686
storage/rocksdb/myrocks_hotbackup
Executable file
686
storage/rocksdb/myrocks_hotbackup
Executable file
|
@ -0,0 +1,686 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from __future__ import division
|
||||
from optparse import OptionParser
|
||||
import collections
|
||||
import signal
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import re
|
||||
import commands
|
||||
import subprocess
|
||||
import logging
|
||||
import logging.handlers
|
||||
import time
|
||||
import datetime
|
||||
import shutil
|
||||
import traceback
|
||||
import tempfile
|
||||
|
||||
import MySQLdb
|
||||
import MySQLdb.connections
|
||||
from MySQLdb import OperationalError, ProgrammingError
|
||||
|
||||
logger = None
|
||||
opts = None
|
||||
rocksdb_files = ['MANIFEST', 'CURRENT', 'OPTIONS']
|
||||
rocksdb_data_suffix = '.sst'
|
||||
rocksdb_wal_suffix = '.log'
|
||||
exclude_files = ['master.info', 'relay-log.info', 'worker-relay-log.info',
|
||||
'auto.cnf', 'gaplock.log', 'ibdata', 'ib_logfile', '.trash']
|
||||
wdt_bin = 'wdt'
|
||||
|
||||
def is_manifest(fname):
|
||||
for m in rocksdb_files:
|
||||
if fname.startswith(m):
|
||||
return True
|
||||
return False
|
||||
|
||||
class Writer(object):
|
||||
a = None
|
||||
def __init__(self):
|
||||
a = None
|
||||
|
||||
class StreamWriter(Writer):
|
||||
stream_cmd= ''
|
||||
|
||||
def __init__(self, stream_option):
|
||||
super(StreamWriter, self).__init__()
|
||||
if stream_option == 'tar':
|
||||
self.stream_cmd= 'tar chf -'
|
||||
elif stream_option == 'xbstream':
|
||||
self.stream_cmd= 'xbstream -c'
|
||||
else:
|
||||
raise Exception("Only tar or xbstream is supported as streaming option.")
|
||||
|
||||
def write(self, file_name):
|
||||
rc= os.system(self.stream_cmd + " " + file_name)
|
||||
if (rc != 0):
|
||||
raise Exception("Got error on stream write: " + str(rc) + " " + file_name)
|
||||
|
||||
|
||||
class MiscFilesProcessor():
|
||||
datadir = None
|
||||
wildcard = r'.*\.[frm|MYD|MYI|MAD|MAI|MRG|TRG|TRN|ARM|ARZ|CSM|CSV|opt|par]'
|
||||
regex = None
|
||||
start_backup_time = None
|
||||
skip_check_frm_timestamp = None
|
||||
|
||||
def __init__(self, datadir, skip_check_frm_timestamp, start_backup_time):
|
||||
self.datadir = datadir
|
||||
self.regex = re.compile(self.wildcard)
|
||||
self.skip_check_frm_timestamp = skip_check_frm_timestamp
|
||||
self.start_backup_time = start_backup_time
|
||||
|
||||
def process_db(self, db):
|
||||
# do nothing
|
||||
pass
|
||||
|
||||
def process_file(self, path):
|
||||
# do nothing
|
||||
pass
|
||||
|
||||
def check_frm_timestamp(self, fname, path):
|
||||
if not self.skip_check_frm_timestamp and fname.endswith('.frm'):
|
||||
if os.path.getmtime(path) > self.start_backup_time:
|
||||
logger.error('FRM file %s was updated after starting backups. '
|
||||
'Schema could have changed and the resulting copy may '
|
||||
'not be valid. Aborting. '
|
||||
'(backup time: %s, file modifled time: %s)',
|
||||
path, datetime.datetime.fromtimestamp(self.start_backup_time).strftime('%Y-%m-%d %H:%M:%S'),
|
||||
datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime('%Y-%m-%d %H:%M:%S'))
|
||||
raise Exception("Inconsistent frm file timestamp");
|
||||
|
||||
def process(self):
|
||||
os.chdir(self.datadir)
|
||||
for db in self.get_databases():
|
||||
logger.info("Starting MySQL misc file traversal from database %s..", db)
|
||||
self.process_db(db)
|
||||
for f in self.get_files(db):
|
||||
if self.match(f):
|
||||
rel_path = os.path.join(db, f)
|
||||
self.check_frm_timestamp(f, rel_path)
|
||||
self.process_file(rel_path)
|
||||
logger.info("Traversing misc files from data directory..")
|
||||
for f in self.get_files(""):
|
||||
should_skip = False
|
||||
for e in exclude_files:
|
||||
if f.startswith(e) or f.endswith(e):
|
||||
logger.info("Skipping %s", f)
|
||||
should_skip = True
|
||||
break
|
||||
if not should_skip:
|
||||
self.process_file(f)
|
||||
|
||||
def match(self, filename):
|
||||
if self.regex.match(filename):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_databases(self):
|
||||
dbs = []
|
||||
dirs = [ d for d in os.listdir(self.datadir) \
|
||||
if not os.path.isfile(os.path.join(self.datadir,d))]
|
||||
for db in dirs:
|
||||
if not db.startswith('.') and not self._is_socket(db):
|
||||
dbs.append(db)
|
||||
return dbs
|
||||
|
||||
def get_files(self, db):
|
||||
dbdir = self.datadir + "/" + db
|
||||
return [ f for f in os.listdir(dbdir) \
|
||||
if os.path.isfile(os.path.join(dbdir,f))]
|
||||
|
||||
def _is_socket(self, item):
|
||||
mode = os.stat(os.path.join(self.datadir, item)).st_mode
|
||||
if stat.S_ISSOCK(mode):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class MySQLBackup(MiscFilesProcessor):
|
||||
writer = None
|
||||
|
||||
def __init__(self, datadir, writer, skip_check_frm_timestamp, start_backup_time):
|
||||
MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time)
|
||||
self.writer = writer
|
||||
|
||||
def process_file(self, fname): # overriding base class
|
||||
self.writer.write(fname)
|
||||
|
||||
|
||||
class MiscFilesLinkCreator(MiscFilesProcessor):
|
||||
snapshot_dir = None
|
||||
|
||||
def __init__(self, datadir, snapshot_dir, skip_check_frm_timestamp, start_backup_time):
|
||||
MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time)
|
||||
self.snapshot_dir = snapshot_dir
|
||||
|
||||
def process_db(self, db):
|
||||
snapshot_sub_dir = os.path.join(self.snapshot_dir, db)
|
||||
os.makedirs(snapshot_sub_dir)
|
||||
|
||||
def process_file(self, path):
|
||||
dst_path = os.path.join(self.snapshot_dir, path)
|
||||
os.link(path, dst_path)
|
||||
|
||||
|
||||
# RocksDB backup
|
||||
class RocksDBBackup():
|
||||
source_dir = None
|
||||
writer = None
|
||||
# sst files sent in this backup round
|
||||
sent_sst = {}
|
||||
# target sst files in this backup round
|
||||
target_sst = {}
|
||||
# sst files sent in all backup rounds
|
||||
total_sent_sst= {}
|
||||
# sum of sst file size sent in this backup round
|
||||
sent_sst_size = 0
|
||||
# sum of target sst file size in this backup round
|
||||
# if sent_sst_size becomes equal to target_sst_size,
|
||||
# it means the backup round finished backing up all sst files
|
||||
target_sst_size = 0
|
||||
# sum of all sst file size sent all backup rounds
|
||||
total_sent_sst_size= 0
|
||||
# sum of all target sst file size from all backup rounds
|
||||
total_target_sst_size = 0
|
||||
show_progress_size_interval= 1073741824 # 1GB
|
||||
wal_files= []
|
||||
manifest_files= []
|
||||
finished= False
|
||||
|
||||
def __init__(self, source_dir, writer, prev):
|
||||
self.source_dir = source_dir
|
||||
self.writer = writer
|
||||
os.chdir(self.source_dir)
|
||||
self.init_target_files(prev)
|
||||
|
||||
def init_target_files(self, prev):
|
||||
sst = {}
|
||||
self.sent_sst = {}
|
||||
self.target_sst= {}
|
||||
self.total_sent_sst = {}
|
||||
self.sent_sst_size = 0
|
||||
self.target_sst_size = 0
|
||||
self.total_sent_sst_size= 0
|
||||
self.total_target_sst_size= 0
|
||||
self.wal_files= []
|
||||
self.manifest_files= []
|
||||
|
||||
for f in os.listdir(self.source_dir):
|
||||
if f.endswith(rocksdb_data_suffix):
|
||||
# exactly the same file (same size) was sent in previous backup rounds
|
||||
if prev is not None and f in prev.total_sent_sst and int(os.stat(f).st_size) == prev.total_sent_sst[f]:
|
||||
continue
|
||||
sst[f]= int(os.stat(f).st_size)
|
||||
self.target_sst_size = self.target_sst_size + os.stat(f).st_size
|
||||
elif is_manifest(f):
|
||||
self.manifest_files.append(f)
|
||||
elif f.endswith(rocksdb_wal_suffix):
|
||||
self.wal_files.append(f)
|
||||
self.target_sst= collections.OrderedDict(sorted(sst.items()))
|
||||
|
||||
if prev is not None:
|
||||
self.total_sent_sst = prev.total_sent_sst
|
||||
self.total_sent_sst_size = prev.total_sent_sst_size
|
||||
self.total_target_sst_size = self.target_sst_size + prev.total_sent_sst_size
|
||||
else:
|
||||
self.total_target_sst_size = self.target_sst_size
|
||||
|
||||
def do_backup_single(self, fname):
|
||||
self.writer.write(fname)
|
||||
os.remove(fname)
|
||||
|
||||
def do_backup_sst(self, fname, size):
|
||||
self.do_backup_single(fname)
|
||||
self.sent_sst[fname]= size
|
||||
self.total_sent_sst[fname]= size
|
||||
self.sent_sst_size = self.sent_sst_size + size
|
||||
self.total_sent_sst_size = self.total_sent_sst_size + size
|
||||
|
||||
def do_backup_manifest(self):
|
||||
for f in self.manifest_files:
|
||||
self.do_backup_single(f)
|
||||
|
||||
def do_backup_wal(self):
|
||||
for f in self.wal_files:
|
||||
self.do_backup_single(f)
|
||||
|
||||
# this is the last snapshot round. backing up all the rest files
|
||||
def do_backup_final(self):
|
||||
logger.info("Backup WAL..")
|
||||
self.do_backup_wal()
|
||||
logger.info("Backup Manifest..")
|
||||
self.do_backup_manifest()
|
||||
self.do_cleanup()
|
||||
self.finished= True
|
||||
|
||||
def do_cleanup(self):
|
||||
shutil.rmtree(self.source_dir)
|
||||
logger.info("Cleaned up checkpoint from %s", self.source_dir)
|
||||
|
||||
def do_backup_until(self, time_limit):
|
||||
logger.info("Starting backup from snapshot: target files %d", len(self.target_sst))
|
||||
start_time= time.time()
|
||||
last_progress_time= start_time
|
||||
progress_size= 0
|
||||
for fname, size in self.target_sst.iteritems():
|
||||
self.do_backup_sst(fname, size)
|
||||
progress_size= progress_size + size
|
||||
elapsed_seconds = time.time() - start_time
|
||||
progress_seconds = time.time() - last_progress_time
|
||||
|
||||
if self.should_show_progress(size):
|
||||
self.show_progress(progress_size, progress_seconds)
|
||||
progress_size=0
|
||||
last_progress_time= time.time()
|
||||
|
||||
if elapsed_seconds > time_limit and self.has_sent_all_sst() is False:
|
||||
logger.info("Snapshot round finished. Elapsed Time: %5.2f. Remaining sst files: %d",
|
||||
elapsed_seconds, len(self.target_sst) - len(self.sent_sst))
|
||||
self.do_cleanup()
|
||||
break;
|
||||
if self.has_sent_all_sst():
|
||||
self.do_backup_final()
|
||||
|
||||
return self
|
||||
|
||||
def should_show_progress(self, size):
|
||||
if int(self.total_sent_sst_size/self.show_progress_size_interval) > int((self.total_sent_sst_size-size)/self.show_progress_size_interval):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def show_progress(self, size, seconds):
|
||||
logger.info("Backup Progress: %5.2f%% Sent %6.2f GB of %6.2f GB data, Transfer Speed: %6.2f MB/s",
|
||||
self.total_sent_sst_size*100/self.total_target_sst_size,
|
||||
self.total_sent_sst_size/1024/1024/1024,
|
||||
self.total_target_sst_size/1024/1024/1024,
|
||||
size/seconds/1024/1024)
|
||||
|
||||
def print_backup_report(self):
|
||||
logger.info("Sent %6.2f GB of sst files, %d files in total.",
|
||||
self.total_sent_sst_size/1024/1024/1024,
|
||||
len(self.total_sent_sst))
|
||||
|
||||
def has_sent_all_sst(self):
|
||||
if self.sent_sst_size == self.target_sst_size:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class MySQLUtil:
|
||||
@staticmethod
|
||||
def connect(user, password, port, socket=None):
|
||||
if socket:
|
||||
dbh = MySQLdb.Connect(user=user,
|
||||
passwd=password,
|
||||
unix_socket=socket)
|
||||
else:
|
||||
dbh = MySQLdb.Connect(user=user,
|
||||
passwd=password,
|
||||
port=port,
|
||||
host="127.0.0.1")
|
||||
return dbh
|
||||
|
||||
@staticmethod
|
||||
def create_checkpoint(dbh, checkpoint_dir):
|
||||
sql = ("SET GLOBAL rocksdb_create_checkpoint='{0}'"
|
||||
.format(checkpoint_dir))
|
||||
cur= dbh.cursor()
|
||||
cur.execute(sql)
|
||||
cur.close()
|
||||
|
||||
@staticmethod
|
||||
def get_datadir(dbh):
|
||||
sql = "SELECT @@datadir"
|
||||
cur = dbh.cursor()
|
||||
cur.execute(sql)
|
||||
row = cur.fetchone()
|
||||
return row[0]
|
||||
|
||||
|
||||
class BackupRunner:
|
||||
datadir = None
|
||||
start_backup_time = None
|
||||
|
||||
def __init__(self, datadir):
|
||||
self.datadir = datadir
|
||||
self.start_backup_time = time.time()
|
||||
|
||||
def start_backup_round(self, backup_round, prev_backup):
|
||||
def signal_handler(*args):
|
||||
logger.info("Got signal. Exit")
|
||||
if b is not None:
|
||||
logger.info("Cleaning up snapshot directory..")
|
||||
b.do_cleanup()
|
||||
sys.exit(1)
|
||||
|
||||
b = None
|
||||
try:
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
w = None
|
||||
if opts.output_stream:
|
||||
w = StreamWriter(opts.output_stream)
|
||||
else:
|
||||
raise Exception("Currently only streaming backup is supported.")
|
||||
|
||||
snapshot_dir = opts.checkpoint_directory + '/' + str(backup_round)
|
||||
dbh = MySQLUtil.connect(opts.mysql_user,
|
||||
opts.mysql_password,
|
||||
opts.mysql_port,
|
||||
opts.mysql_socket)
|
||||
if not self.datadir:
|
||||
self.datadir = MySQLUtil.get_datadir(dbh)
|
||||
logger.info("Set datadir: %s", self.datadir)
|
||||
logger.info("Creating checkpoint at %s", snapshot_dir)
|
||||
MySQLUtil.create_checkpoint(dbh, snapshot_dir)
|
||||
logger.info("Created checkpoint at %s", snapshot_dir)
|
||||
b = RocksDBBackup(snapshot_dir, w, prev_backup)
|
||||
return b.do_backup_until(opts.checkpoint_interval)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.error(traceback.format_exc())
|
||||
if b is not None:
|
||||
logger.info("Cleaning up snapshot directory.")
|
||||
b.do_cleanup()
|
||||
sys.exit(1)
|
||||
|
||||
def backup_mysql(self):
|
||||
try:
|
||||
w = None
|
||||
if opts.output_stream:
|
||||
w = StreamWriter(opts.output_stream)
|
||||
else:
|
||||
raise Exception("Currently only streaming backup is supported.")
|
||||
b = MySQLBackup(self.datadir, w, opts.skip_check_frm_timestamp,
|
||||
self.start_backup_time)
|
||||
logger.info("Taking MySQL misc backups..")
|
||||
b.process()
|
||||
logger.info("MySQL misc backups done.")
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.error(traceback.format_exc())
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
class WDTBackup:
|
||||
datadir = None
|
||||
start_backup_time = None
|
||||
|
||||
def __init__(self, datadir):
|
||||
self.datadir = datadir
|
||||
self.start_backup_time = time.time()
|
||||
|
||||
def cleanup(self, snapshot_dir, server_log):
|
||||
if server_log:
|
||||
server_log.seek(0)
|
||||
logger.info("WDT server log:")
|
||||
logger.info(server_log.read())
|
||||
server_log.close()
|
||||
if snapshot_dir:
|
||||
logger.info("Cleaning up snapshot dir %s", snapshot_dir)
|
||||
shutil.rmtree(snapshot_dir)
|
||||
|
||||
def backup_with_timeout(self, backup_round):
|
||||
def signal_handler(*args):
|
||||
logger.info("Got signal. Exit")
|
||||
self.cleanup(snapshot_dir, server_log)
|
||||
sys.exit(1)
|
||||
|
||||
logger.info("Starting backup round %d", backup_round)
|
||||
snapshot_dir = None
|
||||
server_log = None
|
||||
try:
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
# create rocksdb snapshot
|
||||
snapshot_dir = os.path.join(opts.checkpoint_directory, str(backup_round))
|
||||
dbh = MySQLUtil.connect(opts.mysql_user,
|
||||
opts.mysql_password,
|
||||
opts.mysql_port,
|
||||
opts.mysql_socket)
|
||||
logger.info("Creating checkpoint at %s", snapshot_dir)
|
||||
MySQLUtil.create_checkpoint(dbh, snapshot_dir)
|
||||
logger.info("Created checkpoint at %s", snapshot_dir)
|
||||
|
||||
# get datadir if not provided
|
||||
if not self.datadir:
|
||||
self.datadir = MySQLUtil.get_datadir(dbh)
|
||||
logger.info("Set datadir: %s", self.datadir)
|
||||
|
||||
# create links for misc files
|
||||
link_creator = MiscFilesLinkCreator(self.datadir, snapshot_dir,
|
||||
opts.skip_check_frm_timestamp,
|
||||
self.start_backup_time)
|
||||
link_creator.process()
|
||||
|
||||
current_path = os.path.join(opts.backupdir, "CURRENT")
|
||||
|
||||
# construct receiver cmd, using the data directory as recovery-id.
|
||||
# we delete the current file because it is not append-only, therefore not
|
||||
# resumable.
|
||||
remote_cmd = (
|
||||
"ssh {0} rm -f {1}; "
|
||||
"{2} -directory {3} -enable_download_resumption "
|
||||
"-recovery_id {4} -start_port 0 -abort_after_seconds {5} {6}"
|
||||
).format(opts.destination,
|
||||
current_path,
|
||||
wdt_bin,
|
||||
opts.backupdir,
|
||||
self.datadir,
|
||||
opts.checkpoint_interval,
|
||||
opts.extra_wdt_receiver_options)
|
||||
logger.info("WDT remote cmd %s", remote_cmd)
|
||||
server_log = tempfile.TemporaryFile()
|
||||
remote_process = subprocess.Popen(remote_cmd.split(),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=server_log)
|
||||
wdt_url = remote_process.stdout.readline().strip()
|
||||
if not wdt_url:
|
||||
raise Exception("Unable to get connection url from wdt receiver")
|
||||
sender_cmd = (
|
||||
"{0} -connection_url \'{1}\' -directory {2} -app_name=myrocks "
|
||||
"-avg_mbytes_per_sec {3} "
|
||||
"-enable_download_resumption -abort_after_seconds {4} {5}"
|
||||
).format(wdt_bin,
|
||||
wdt_url,
|
||||
snapshot_dir,
|
||||
opts.avg_mbytes_per_sec,
|
||||
opts.checkpoint_interval,
|
||||
opts.extra_wdt_sender_options)
|
||||
sender_status = os.system(sender_cmd) >> 8
|
||||
remote_status = remote_process.wait()
|
||||
self.cleanup(snapshot_dir, server_log)
|
||||
# TODO: handle retryable and non-retyable errors differently
|
||||
return (sender_status == 0 and remote_status == 0)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
logger.error(traceback.format_exc())
|
||||
self.cleanup(snapshot_dir, server_log)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def backup_using_wdt():
|
||||
if not opts.destination:
|
||||
logger.error("Must provide remote destination when using WDT")
|
||||
sys.exit(1)
|
||||
|
||||
# TODO: detect whether WDT is installed
|
||||
logger.info("Backing up myrocks to %s using WDT", opts.destination)
|
||||
wdt_backup = WDTBackup(opts.datadir)
|
||||
finished = False
|
||||
backup_round = 1
|
||||
while not finished:
|
||||
start_time = time.time()
|
||||
finished = wdt_backup.backup_with_timeout(backup_round)
|
||||
end_time = time.time()
|
||||
duration_seconds = end_time - start_time
|
||||
if (not finished) and (duration_seconds < opts.checkpoint_interval):
|
||||
# round finished before timeout
|
||||
sleep_duration = (opts.checkpoint_interval - duration_seconds)
|
||||
logger.info("Sleeping for %f seconds", sleep_duration)
|
||||
time.sleep(sleep_duration)
|
||||
|
||||
backup_round = backup_round + 1
|
||||
logger.info("Finished myrocks backup using WDT")
|
||||
|
||||
|
||||
def init_logger():
|
||||
global logger
|
||||
logger = logging.getLogger('myrocks_hotbackup')
|
||||
logger.setLevel(logging.INFO)
|
||||
h1= logging.StreamHandler(sys.stderr)
|
||||
f = logging.Formatter("%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
|
||||
"%Y-%m-%d %H:%M:%S")
|
||||
h1.setFormatter(f)
|
||||
logger.addHandler(h1)
|
||||
|
||||
backup_wdt_usage = ("Backup using WDT: myrocks_hotbackup "
|
||||
"--user=root --password=pw --stream=wdt "
|
||||
"--checkpoint_dir=<directory where temporary backup hard links "
|
||||
"are created> --destination=<remote host name> --backup_dir="
|
||||
"<remote directory name>. This has to be executed at the src "
|
||||
"host.")
|
||||
backup_usage= "Backup: set -o pipefail; myrocks_hotbackup --user=root --password=pw --port=3306 --checkpoint_dir=<directory where temporary backup hard links are created> | ssh -o NoneEnabled=yes remote_server 'tar -xi -C <directory on remote server where backups will be sent>' . You need to execute backup command on a server where you take backups."
|
||||
move_back_usage= "Move-Back: myrocks_hotbackup --move_back --datadir=<dest mysql datadir> --rocksdb_datadir=<dest rocksdb datadir> --rocksdb_waldir=<dest rocksdb wal dir> --backup_dir=<where backup files are stored> . You need to execute move-back command on a server where backup files are sent."
|
||||
|
||||
|
||||
def parse_options():
|
||||
global opts
|
||||
parser = OptionParser(usage = "\n\n" + backup_usage + "\n\n" + \
|
||||
backup_wdt_usage + "\n\n" + move_back_usage)
|
||||
parser.add_option('-i', '--interval', type='int', dest='checkpoint_interval',
|
||||
default=300,
|
||||
help='Number of seconds to renew checkpoint')
|
||||
parser.add_option('-c', '--checkpoint_dir', type='string', dest='checkpoint_directory',
|
||||
default='/data/mysql/backup/snapshot',
|
||||
help='Local directory name where checkpoints will be created.')
|
||||
parser.add_option('-d', '--datadir', type='string', dest='datadir',
|
||||
default=None,
|
||||
help='backup mode: src MySQL datadir. move_back mode: dest MySQL datadir')
|
||||
parser.add_option('-s', '--stream', type='string', dest='output_stream',
|
||||
default='tar',
|
||||
help='Setting streaming backup options. Currently tar, WDT '
|
||||
'and xbstream are supported. Default is tar')
|
||||
parser.add_option('--destination', type='string', dest='destination',
|
||||
default='',
|
||||
help='Remote server name. Only used for WDT mode so far.')
|
||||
parser.add_option('--avg_mbytes_per_sec', type='int',
|
||||
dest='avg_mbytes_per_sec',
|
||||
default=500,
|
||||
help='Average backup rate in MBytes/sec. WDT only.')
|
||||
parser.add_option('--extra_wdt_sender_options', type='string',
|
||||
dest='extra_wdt_sender_options',
|
||||
default='',
|
||||
help='Extra options for WDT sender')
|
||||
parser.add_option('--extra_wdt_receiver_options', type='string',
|
||||
dest='extra_wdt_receiver_options',
|
||||
default='',
|
||||
help='Extra options for WDT receiver')
|
||||
parser.add_option('-u', '--user', type='string', dest='mysql_user',
|
||||
default='root',
|
||||
help='MySQL user name')
|
||||
parser.add_option('-p', '--password', type='string', dest='mysql_password',
|
||||
default='',
|
||||
help='MySQL password name')
|
||||
parser.add_option('-P', '--port', type='int', dest='mysql_port',
|
||||
default=3306,
|
||||
help='MySQL port number')
|
||||
parser.add_option('-S', '--socket', type='string', dest='mysql_socket',
|
||||
default=None,
|
||||
help='MySQL socket path. Takes precedence over --port.')
|
||||
parser.add_option('-m', '--move_back', action='store_true', dest='move_back',
|
||||
default=False,
|
||||
help='Moving MyRocks backup files to proper locations.')
|
||||
parser.add_option('-r', '--rocksdb_datadir', type='string', dest='rocksdb_datadir',
|
||||
default=None,
|
||||
help='RocksDB target data directory where backup data files will be moved. Must be empty.')
|
||||
parser.add_option('-w', '--rocksdb_waldir', type='string', dest='rocksdb_waldir',
|
||||
default=None,
|
||||
help='RocksDB target data directory where backup wal files will be moved. Must be empty.')
|
||||
parser.add_option('-b', '--backup_dir', type='string', dest='backupdir',
|
||||
default=None,
|
||||
help='backup mode for WDT: Remote directory to store '
|
||||
'backup. move_back mode: Locations where backup '
|
||||
'files are stored.')
|
||||
parser.add_option('-f', '--skip_check_frm_timestamp',
|
||||
dest='skip_check_frm_timestamp',
|
||||
action='store_true', default=False,
|
||||
help='skipping to check if frm files are updated after starting backup.')
|
||||
parser.add_option('-D', '--debug_signal_file', type='string', dest='debug_signal_file',
|
||||
default=None,
|
||||
help='debugging purpose: waiting until the specified file is created')
|
||||
|
||||
opts, args = parser.parse_args()
|
||||
|
||||
|
||||
def create_moveback_dir(directory):
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
else:
|
||||
for f in os.listdir(directory):
|
||||
logger.error("Directory %s has file or directory %s!", directory, f)
|
||||
raise
|
||||
|
||||
def print_move_back_usage():
|
||||
logger.warning(move_back_usage)
|
||||
|
||||
def move_back():
|
||||
if opts.rocksdb_datadir is None or opts.rocksdb_waldir is None or opts.backupdir is None or opts.datadir is None:
|
||||
print_move_back_usage()
|
||||
sys.exit()
|
||||
create_moveback_dir(opts.datadir)
|
||||
create_moveback_dir(opts.rocksdb_datadir)
|
||||
create_moveback_dir(opts.rocksdb_waldir)
|
||||
|
||||
os.chdir(opts.backupdir)
|
||||
for f in os.listdir(opts.backupdir):
|
||||
if os.path.isfile(os.path.join(opts.backupdir,f)):
|
||||
if f.endswith(rocksdb_wal_suffix):
|
||||
shutil.move(f, opts.rocksdb_waldir)
|
||||
elif f.endswith(rocksdb_data_suffix) or is_manifest(f):
|
||||
shutil.move(f, opts.rocksdb_datadir)
|
||||
else:
|
||||
shutil.move(f, opts.datadir)
|
||||
else: #directory
|
||||
if f.endswith('.rocksdb'):
|
||||
continue
|
||||
shutil.move(f, opts.datadir)
|
||||
|
||||
def start_backup():
|
||||
logger.info("Starting backup.")
|
||||
runner = BackupRunner(opts.datadir)
|
||||
b = None
|
||||
backup_round= 1
|
||||
while True:
|
||||
b = runner.start_backup_round(backup_round, b)
|
||||
backup_round = backup_round + 1
|
||||
if b.finished is True:
|
||||
b.print_backup_report()
|
||||
logger.info("RocksDB Backup Done.")
|
||||
break
|
||||
if opts.debug_signal_file:
|
||||
while not os.path.exists(opts.debug_signal_file):
|
||||
logger.info("Waiting until %s is created..", opts.debug_signal_file)
|
||||
time.sleep(1)
|
||||
runner.backup_mysql()
|
||||
logger.info("All Backups Done.")
|
||||
|
||||
|
||||
def main():
|
||||
parse_options()
|
||||
init_logger()
|
||||
|
||||
if opts.move_back is True:
|
||||
move_back()
|
||||
elif opts.output_stream == 'wdt':
|
||||
backup_using_wdt()
|
||||
else:
|
||||
start_backup()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in a new issue