mirror of
https://github.com/MariaDB/server.git
synced 2025-01-31 02:51:44 +01:00
445e518bc7
commit f8f364b47f2784f16b401f27658f1c16eaf348ec Author: Jay Edgar <jkedgar@fb.com> Date: Tue Oct 17 15:19:31 2017 -0700 Add a hashed, hierarchical, wheel timer implementation Summary: In order to implement idle timeouts on detached sessions we need something inside MySQL that is lightweight and can handle calling events in the future with very little cost for cancelling or resetting the event. A hashed, hi By default the timers are grouped into 10ms buckets (the 'hashed' part), though the size of the buckets is configurable at the creation of the timer. Each wheel (the 'wheel' part) maintains 256 buckets and cascades to the whe Reviewed By: djwatson Differential Revision: D6199806 fbshipit-source-id: 5e1590f
686 lines
24 KiB
Python
Executable file
686 lines
24 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
from __future__ import division
|
|
from optparse import OptionParser
|
|
import collections
|
|
import signal
|
|
import os
|
|
import stat
|
|
import sys
|
|
import re
|
|
import commands
|
|
import subprocess
|
|
import logging
|
|
import logging.handlers
|
|
import time
|
|
import datetime
|
|
import shutil
|
|
import traceback
|
|
import tempfile
|
|
|
|
import MySQLdb
|
|
import MySQLdb.connections
|
|
from MySQLdb import OperationalError, ProgrammingError
|
|
|
|
logger = None
|
|
opts = None
|
|
rocksdb_files = ['MANIFEST', 'CURRENT', 'OPTIONS']
|
|
rocksdb_data_suffix = '.sst'
|
|
rocksdb_wal_suffix = '.log'
|
|
exclude_files = ['master.info', 'relay-log.info', 'worker-relay-log.info',
|
|
'auto.cnf', 'gaplock.log', 'ibdata', 'ib_logfile', '.trash']
|
|
wdt_bin = 'wdt'
|
|
|
|
def is_manifest(fname):
|
|
for m in rocksdb_files:
|
|
if fname.startswith(m):
|
|
return True
|
|
return False
|
|
|
|
class Writer(object):
|
|
a = None
|
|
def __init__(self):
|
|
a = None
|
|
|
|
class StreamWriter(Writer):
|
|
stream_cmd= ''
|
|
|
|
def __init__(self, stream_option):
|
|
super(StreamWriter, self).__init__()
|
|
if stream_option == 'tar':
|
|
self.stream_cmd= 'tar chf -'
|
|
elif stream_option == 'xbstream':
|
|
self.stream_cmd= 'xbstream -c'
|
|
else:
|
|
raise Exception("Only tar or xbstream is supported as streaming option.")
|
|
|
|
def write(self, file_name):
|
|
rc= os.system(self.stream_cmd + " " + file_name)
|
|
if (rc != 0):
|
|
raise Exception("Got error on stream write: " + str(rc) + " " + file_name)
|
|
|
|
|
|
class MiscFilesProcessor():
|
|
datadir = None
|
|
wildcard = r'.*\.[frm|MYD|MYI|MAD|MAI|MRG|TRG|TRN|ARM|ARZ|CSM|CSV|opt|par]'
|
|
regex = None
|
|
start_backup_time = None
|
|
skip_check_frm_timestamp = None
|
|
|
|
def __init__(self, datadir, skip_check_frm_timestamp, start_backup_time):
|
|
self.datadir = datadir
|
|
self.regex = re.compile(self.wildcard)
|
|
self.skip_check_frm_timestamp = skip_check_frm_timestamp
|
|
self.start_backup_time = start_backup_time
|
|
|
|
def process_db(self, db):
|
|
# do nothing
|
|
pass
|
|
|
|
def process_file(self, path):
|
|
# do nothing
|
|
pass
|
|
|
|
def check_frm_timestamp(self, fname, path):
|
|
if not self.skip_check_frm_timestamp and fname.endswith('.frm'):
|
|
if os.path.getmtime(path) > self.start_backup_time:
|
|
logger.error('FRM file %s was updated after starting backups. '
|
|
'Schema could have changed and the resulting copy may '
|
|
'not be valid. Aborting. '
|
|
'(backup time: %s, file modifled time: %s)',
|
|
path, datetime.datetime.fromtimestamp(self.start_backup_time).strftime('%Y-%m-%d %H:%M:%S'),
|
|
datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime('%Y-%m-%d %H:%M:%S'))
|
|
raise Exception("Inconsistent frm file timestamp");
|
|
|
|
def process(self):
|
|
os.chdir(self.datadir)
|
|
for db in self.get_databases():
|
|
logger.info("Starting MySQL misc file traversal from database %s..", db)
|
|
self.process_db(db)
|
|
for f in self.get_files(db):
|
|
if self.match(f):
|
|
rel_path = os.path.join(db, f)
|
|
self.check_frm_timestamp(f, rel_path)
|
|
self.process_file(rel_path)
|
|
logger.info("Traversing misc files from data directory..")
|
|
for f in self.get_files(""):
|
|
should_skip = False
|
|
for e in exclude_files:
|
|
if f.startswith(e) or f.endswith(e):
|
|
logger.info("Skipping %s", f)
|
|
should_skip = True
|
|
break
|
|
if not should_skip:
|
|
self.process_file(f)
|
|
|
|
def match(self, filename):
|
|
if self.regex.match(filename):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_databases(self):
|
|
dbs = []
|
|
dirs = [ d for d in os.listdir(self.datadir) \
|
|
if not os.path.isfile(os.path.join(self.datadir,d))]
|
|
for db in dirs:
|
|
if not db.startswith('.') and not self._is_socket(db) and not db == "#rocksdb":
|
|
dbs.append(db)
|
|
return dbs
|
|
|
|
def get_files(self, db):
|
|
dbdir = self.datadir + "/" + db
|
|
return [ f for f in os.listdir(dbdir) \
|
|
if os.path.isfile(os.path.join(dbdir,f))]
|
|
|
|
def _is_socket(self, item):
|
|
mode = os.stat(os.path.join(self.datadir, item)).st_mode
|
|
if stat.S_ISSOCK(mode):
|
|
return True
|
|
return False
|
|
|
|
|
|
class MySQLBackup(MiscFilesProcessor):
|
|
writer = None
|
|
|
|
def __init__(self, datadir, writer, skip_check_frm_timestamp, start_backup_time):
|
|
MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time)
|
|
self.writer = writer
|
|
|
|
def process_file(self, fname): # overriding base class
|
|
self.writer.write(fname)
|
|
|
|
|
|
class MiscFilesLinkCreator(MiscFilesProcessor):
|
|
snapshot_dir = None
|
|
|
|
def __init__(self, datadir, snapshot_dir, skip_check_frm_timestamp, start_backup_time):
|
|
MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time)
|
|
self.snapshot_dir = snapshot_dir
|
|
|
|
def process_db(self, db):
|
|
snapshot_sub_dir = os.path.join(self.snapshot_dir, db)
|
|
os.makedirs(snapshot_sub_dir)
|
|
|
|
def process_file(self, path):
|
|
dst_path = os.path.join(self.snapshot_dir, path)
|
|
os.link(path, dst_path)
|
|
|
|
|
|
# RocksDB backup
|
|
class RocksDBBackup():
|
|
source_dir = None
|
|
writer = None
|
|
# sst files sent in this backup round
|
|
sent_sst = {}
|
|
# target sst files in this backup round
|
|
target_sst = {}
|
|
# sst files sent in all backup rounds
|
|
total_sent_sst= {}
|
|
# sum of sst file size sent in this backup round
|
|
sent_sst_size = 0
|
|
# sum of target sst file size in this backup round
|
|
# if sent_sst_size becomes equal to target_sst_size,
|
|
# it means the backup round finished backing up all sst files
|
|
target_sst_size = 0
|
|
# sum of all sst file size sent all backup rounds
|
|
total_sent_sst_size= 0
|
|
# sum of all target sst file size from all backup rounds
|
|
total_target_sst_size = 0
|
|
show_progress_size_interval= 1073741824 # 1GB
|
|
wal_files= []
|
|
manifest_files= []
|
|
finished= False
|
|
|
|
def __init__(self, source_dir, writer, prev):
|
|
self.source_dir = source_dir
|
|
self.writer = writer
|
|
os.chdir(self.source_dir)
|
|
self.init_target_files(prev)
|
|
|
|
def init_target_files(self, prev):
|
|
sst = {}
|
|
self.sent_sst = {}
|
|
self.target_sst= {}
|
|
self.total_sent_sst = {}
|
|
self.sent_sst_size = 0
|
|
self.target_sst_size = 0
|
|
self.total_sent_sst_size= 0
|
|
self.total_target_sst_size= 0
|
|
self.wal_files= []
|
|
self.manifest_files= []
|
|
|
|
for f in os.listdir(self.source_dir):
|
|
if f.endswith(rocksdb_data_suffix):
|
|
# exactly the same file (same size) was sent in previous backup rounds
|
|
if prev is not None and f in prev.total_sent_sst and int(os.stat(f).st_size) == prev.total_sent_sst[f]:
|
|
continue
|
|
sst[f]= int(os.stat(f).st_size)
|
|
self.target_sst_size = self.target_sst_size + os.stat(f).st_size
|
|
elif is_manifest(f):
|
|
self.manifest_files.append(f)
|
|
elif f.endswith(rocksdb_wal_suffix):
|
|
self.wal_files.append(f)
|
|
self.target_sst= collections.OrderedDict(sorted(sst.items()))
|
|
|
|
if prev is not None:
|
|
self.total_sent_sst = prev.total_sent_sst
|
|
self.total_sent_sst_size = prev.total_sent_sst_size
|
|
self.total_target_sst_size = self.target_sst_size + prev.total_sent_sst_size
|
|
else:
|
|
self.total_target_sst_size = self.target_sst_size
|
|
|
|
def do_backup_single(self, fname):
|
|
self.writer.write(fname)
|
|
os.remove(fname)
|
|
|
|
def do_backup_sst(self, fname, size):
|
|
self.do_backup_single(fname)
|
|
self.sent_sst[fname]= size
|
|
self.total_sent_sst[fname]= size
|
|
self.sent_sst_size = self.sent_sst_size + size
|
|
self.total_sent_sst_size = self.total_sent_sst_size + size
|
|
|
|
def do_backup_manifest(self):
|
|
for f in self.manifest_files:
|
|
self.do_backup_single(f)
|
|
|
|
def do_backup_wal(self):
|
|
for f in self.wal_files:
|
|
self.do_backup_single(f)
|
|
|
|
# this is the last snapshot round. backing up all the rest files
|
|
def do_backup_final(self):
|
|
logger.info("Backup WAL..")
|
|
self.do_backup_wal()
|
|
logger.info("Backup Manifest..")
|
|
self.do_backup_manifest()
|
|
self.do_cleanup()
|
|
self.finished= True
|
|
|
|
def do_cleanup(self):
|
|
shutil.rmtree(self.source_dir)
|
|
logger.info("Cleaned up checkpoint from %s", self.source_dir)
|
|
|
|
def do_backup_until(self, time_limit):
|
|
logger.info("Starting backup from snapshot: target files %d", len(self.target_sst))
|
|
start_time= time.time()
|
|
last_progress_time= start_time
|
|
progress_size= 0
|
|
for fname, size in self.target_sst.iteritems():
|
|
self.do_backup_sst(fname, size)
|
|
progress_size= progress_size + size
|
|
elapsed_seconds = time.time() - start_time
|
|
progress_seconds = time.time() - last_progress_time
|
|
|
|
if self.should_show_progress(size):
|
|
self.show_progress(progress_size, progress_seconds)
|
|
progress_size=0
|
|
last_progress_time= time.time()
|
|
|
|
if elapsed_seconds > time_limit and self.has_sent_all_sst() is False:
|
|
logger.info("Snapshot round finished. Elapsed Time: %5.2f. Remaining sst files: %d",
|
|
elapsed_seconds, len(self.target_sst) - len(self.sent_sst))
|
|
self.do_cleanup()
|
|
break;
|
|
if self.has_sent_all_sst():
|
|
self.do_backup_final()
|
|
|
|
return self
|
|
|
|
def should_show_progress(self, size):
|
|
if int(self.total_sent_sst_size/self.show_progress_size_interval) > int((self.total_sent_sst_size-size)/self.show_progress_size_interval):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def show_progress(self, size, seconds):
|
|
logger.info("Backup Progress: %5.2f%% Sent %6.2f GB of %6.2f GB data, Transfer Speed: %6.2f MB/s",
|
|
self.total_sent_sst_size*100/self.total_target_sst_size,
|
|
self.total_sent_sst_size/1024/1024/1024,
|
|
self.total_target_sst_size/1024/1024/1024,
|
|
size/seconds/1024/1024)
|
|
|
|
def print_backup_report(self):
|
|
logger.info("Sent %6.2f GB of sst files, %d files in total.",
|
|
self.total_sent_sst_size/1024/1024/1024,
|
|
len(self.total_sent_sst))
|
|
|
|
def has_sent_all_sst(self):
|
|
if self.sent_sst_size == self.target_sst_size:
|
|
return True
|
|
return False
|
|
|
|
|
|
class MySQLUtil:
|
|
@staticmethod
|
|
def connect(user, password, port, socket=None):
|
|
if socket:
|
|
dbh = MySQLdb.Connect(user=user,
|
|
passwd=password,
|
|
unix_socket=socket)
|
|
else:
|
|
dbh = MySQLdb.Connect(user=user,
|
|
passwd=password,
|
|
port=port,
|
|
host="127.0.0.1")
|
|
return dbh
|
|
|
|
@staticmethod
|
|
def create_checkpoint(dbh, checkpoint_dir):
|
|
sql = ("SET GLOBAL rocksdb_create_checkpoint='{0}'"
|
|
.format(checkpoint_dir))
|
|
cur= dbh.cursor()
|
|
cur.execute(sql)
|
|
cur.close()
|
|
|
|
@staticmethod
|
|
def get_datadir(dbh):
|
|
sql = "SELECT @@datadir"
|
|
cur = dbh.cursor()
|
|
cur.execute(sql)
|
|
row = cur.fetchone()
|
|
return row[0]
|
|
|
|
|
|
class BackupRunner:
|
|
datadir = None
|
|
start_backup_time = None
|
|
|
|
def __init__(self, datadir):
|
|
self.datadir = datadir
|
|
self.start_backup_time = time.time()
|
|
|
|
def start_backup_round(self, backup_round, prev_backup):
|
|
def signal_handler(*args):
|
|
logger.info("Got signal. Exit")
|
|
if b is not None:
|
|
logger.info("Cleaning up snapshot directory..")
|
|
b.do_cleanup()
|
|
sys.exit(1)
|
|
|
|
b = None
|
|
try:
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
w = None
|
|
if opts.output_stream:
|
|
w = StreamWriter(opts.output_stream)
|
|
else:
|
|
raise Exception("Currently only streaming backup is supported.")
|
|
|
|
snapshot_dir = opts.checkpoint_directory + '/' + str(backup_round)
|
|
dbh = MySQLUtil.connect(opts.mysql_user,
|
|
opts.mysql_password,
|
|
opts.mysql_port,
|
|
opts.mysql_socket)
|
|
if not self.datadir:
|
|
self.datadir = MySQLUtil.get_datadir(dbh)
|
|
logger.info("Set datadir: %s", self.datadir)
|
|
logger.info("Creating checkpoint at %s", snapshot_dir)
|
|
MySQLUtil.create_checkpoint(dbh, snapshot_dir)
|
|
logger.info("Created checkpoint at %s", snapshot_dir)
|
|
b = RocksDBBackup(snapshot_dir, w, prev_backup)
|
|
return b.do_backup_until(opts.checkpoint_interval)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error(traceback.format_exc())
|
|
if b is not None:
|
|
logger.info("Cleaning up snapshot directory.")
|
|
b.do_cleanup()
|
|
sys.exit(1)
|
|
|
|
def backup_mysql(self):
|
|
try:
|
|
w = None
|
|
if opts.output_stream:
|
|
w = StreamWriter(opts.output_stream)
|
|
else:
|
|
raise Exception("Currently only streaming backup is supported.")
|
|
b = MySQLBackup(self.datadir, w, opts.skip_check_frm_timestamp,
|
|
self.start_backup_time)
|
|
logger.info("Taking MySQL misc backups..")
|
|
b.process()
|
|
logger.info("MySQL misc backups done.")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error(traceback.format_exc())
|
|
sys.exit(1)
|
|
|
|
|
|
class WDTBackup:
|
|
datadir = None
|
|
start_backup_time = None
|
|
|
|
def __init__(self, datadir):
|
|
self.datadir = datadir
|
|
self.start_backup_time = time.time()
|
|
|
|
def cleanup(self, snapshot_dir, server_log):
|
|
if server_log:
|
|
server_log.seek(0)
|
|
logger.info("WDT server log:")
|
|
logger.info(server_log.read())
|
|
server_log.close()
|
|
if snapshot_dir:
|
|
logger.info("Cleaning up snapshot dir %s", snapshot_dir)
|
|
shutil.rmtree(snapshot_dir)
|
|
|
|
def backup_with_timeout(self, backup_round):
|
|
def signal_handler(*args):
|
|
logger.info("Got signal. Exit")
|
|
self.cleanup(snapshot_dir, server_log)
|
|
sys.exit(1)
|
|
|
|
logger.info("Starting backup round %d", backup_round)
|
|
snapshot_dir = None
|
|
server_log = None
|
|
try:
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
# create rocksdb snapshot
|
|
snapshot_dir = os.path.join(opts.checkpoint_directory, str(backup_round))
|
|
dbh = MySQLUtil.connect(opts.mysql_user,
|
|
opts.mysql_password,
|
|
opts.mysql_port,
|
|
opts.mysql_socket)
|
|
logger.info("Creating checkpoint at %s", snapshot_dir)
|
|
MySQLUtil.create_checkpoint(dbh, snapshot_dir)
|
|
logger.info("Created checkpoint at %s", snapshot_dir)
|
|
|
|
# get datadir if not provided
|
|
if not self.datadir:
|
|
self.datadir = MySQLUtil.get_datadir(dbh)
|
|
logger.info("Set datadir: %s", self.datadir)
|
|
|
|
# create links for misc files
|
|
link_creator = MiscFilesLinkCreator(self.datadir, snapshot_dir,
|
|
opts.skip_check_frm_timestamp,
|
|
self.start_backup_time)
|
|
link_creator.process()
|
|
|
|
current_path = os.path.join(opts.backupdir, "CURRENT")
|
|
|
|
# construct receiver cmd, using the data directory as recovery-id.
|
|
# we delete the current file because it is not append-only, therefore not
|
|
# resumable.
|
|
remote_cmd = (
|
|
"ssh {0} rm -f {1}; "
|
|
"{2} -directory {3} -enable_download_resumption "
|
|
"-recovery_id {4} -start_port 0 -abort_after_seconds {5} {6}"
|
|
).format(opts.destination,
|
|
current_path,
|
|
wdt_bin,
|
|
opts.backupdir,
|
|
self.datadir,
|
|
opts.checkpoint_interval,
|
|
opts.extra_wdt_receiver_options)
|
|
logger.info("WDT remote cmd %s", remote_cmd)
|
|
server_log = tempfile.TemporaryFile()
|
|
remote_process = subprocess.Popen(remote_cmd.split(),
|
|
stdout=subprocess.PIPE,
|
|
stderr=server_log)
|
|
wdt_url = remote_process.stdout.readline().strip()
|
|
if not wdt_url:
|
|
raise Exception("Unable to get connection url from wdt receiver")
|
|
sender_cmd = (
|
|
"{0} -connection_url \'{1}\' -directory {2} -app_name=myrocks "
|
|
"-avg_mbytes_per_sec {3} "
|
|
"-enable_download_resumption -abort_after_seconds {4} {5}"
|
|
).format(wdt_bin,
|
|
wdt_url,
|
|
snapshot_dir,
|
|
opts.avg_mbytes_per_sec,
|
|
opts.checkpoint_interval,
|
|
opts.extra_wdt_sender_options)
|
|
sender_status = os.system(sender_cmd) >> 8
|
|
remote_status = remote_process.wait()
|
|
self.cleanup(snapshot_dir, server_log)
|
|
# TODO: handle retryable and non-retyable errors differently
|
|
return (sender_status == 0 and remote_status == 0)
|
|
|
|
except Exception as e:
|
|
logger.error(e)
|
|
logger.error(traceback.format_exc())
|
|
self.cleanup(snapshot_dir, server_log)
|
|
sys.exit(1)
|
|
|
|
|
|
def backup_using_wdt():
|
|
if not opts.destination:
|
|
logger.error("Must provide remote destination when using WDT")
|
|
sys.exit(1)
|
|
|
|
# TODO: detect whether WDT is installed
|
|
logger.info("Backing up myrocks to %s using WDT", opts.destination)
|
|
wdt_backup = WDTBackup(opts.datadir)
|
|
finished = False
|
|
backup_round = 1
|
|
while not finished:
|
|
start_time = time.time()
|
|
finished = wdt_backup.backup_with_timeout(backup_round)
|
|
end_time = time.time()
|
|
duration_seconds = end_time - start_time
|
|
if (not finished) and (duration_seconds < opts.checkpoint_interval):
|
|
# round finished before timeout
|
|
sleep_duration = (opts.checkpoint_interval - duration_seconds)
|
|
logger.info("Sleeping for %f seconds", sleep_duration)
|
|
time.sleep(sleep_duration)
|
|
|
|
backup_round = backup_round + 1
|
|
logger.info("Finished myrocks backup using WDT")
|
|
|
|
|
|
def init_logger():
|
|
global logger
|
|
logger = logging.getLogger('myrocks_hotbackup')
|
|
logger.setLevel(logging.INFO)
|
|
h1= logging.StreamHandler(sys.stderr)
|
|
f = logging.Formatter("%(asctime)s.%(msecs)03d %(levelname)s %(message)s",
|
|
"%Y-%m-%d %H:%M:%S")
|
|
h1.setFormatter(f)
|
|
logger.addHandler(h1)
|
|
|
|
backup_wdt_usage = ("Backup using WDT: myrocks_hotbackup "
|
|
"--user=root --password=pw --stream=wdt "
|
|
"--checkpoint_dir=<directory where temporary backup hard links "
|
|
"are created> --destination=<remote host name> --backup_dir="
|
|
"<remote directory name>. This has to be executed at the src "
|
|
"host.")
|
|
backup_usage= "Backup: set -o pipefail; myrocks_hotbackup --user=root --password=pw --port=3306 --checkpoint_dir=<directory where temporary backup hard links are created> | ssh -o NoneEnabled=yes remote_server 'tar -xi -C <directory on remote server where backups will be sent>' . You need to execute backup command on a server where you take backups."
|
|
move_back_usage= "Move-Back: myrocks_hotbackup --move_back --datadir=<dest mysql datadir> --rocksdb_datadir=<dest rocksdb datadir> --rocksdb_waldir=<dest rocksdb wal dir> --backup_dir=<where backup files are stored> . You need to execute move-back command on a server where backup files are sent."
|
|
|
|
|
|
def parse_options():
|
|
global opts
|
|
parser = OptionParser(usage = "\n\n" + backup_usage + "\n\n" + \
|
|
backup_wdt_usage + "\n\n" + move_back_usage)
|
|
parser.add_option('-i', '--interval', type='int', dest='checkpoint_interval',
|
|
default=300,
|
|
help='Number of seconds to renew checkpoint')
|
|
parser.add_option('-c', '--checkpoint_dir', type='string', dest='checkpoint_directory',
|
|
default='/data/mysql/backup/snapshot',
|
|
help='Local directory name where checkpoints will be created.')
|
|
parser.add_option('-d', '--datadir', type='string', dest='datadir',
|
|
default=None,
|
|
help='backup mode: src MySQL datadir. move_back mode: dest MySQL datadir')
|
|
parser.add_option('-s', '--stream', type='string', dest='output_stream',
|
|
default='tar',
|
|
help='Setting streaming backup options. Currently tar, WDT '
|
|
'and xbstream are supported. Default is tar')
|
|
parser.add_option('--destination', type='string', dest='destination',
|
|
default='',
|
|
help='Remote server name. Only used for WDT mode so far.')
|
|
parser.add_option('--avg_mbytes_per_sec', type='int',
|
|
dest='avg_mbytes_per_sec',
|
|
default=500,
|
|
help='Average backup rate in MBytes/sec. WDT only.')
|
|
parser.add_option('--extra_wdt_sender_options', type='string',
|
|
dest='extra_wdt_sender_options',
|
|
default='',
|
|
help='Extra options for WDT sender')
|
|
parser.add_option('--extra_wdt_receiver_options', type='string',
|
|
dest='extra_wdt_receiver_options',
|
|
default='',
|
|
help='Extra options for WDT receiver')
|
|
parser.add_option('-u', '--user', type='string', dest='mysql_user',
|
|
default='root',
|
|
help='MySQL user name')
|
|
parser.add_option('-p', '--password', type='string', dest='mysql_password',
|
|
default='',
|
|
help='MySQL password name')
|
|
parser.add_option('-P', '--port', type='int', dest='mysql_port',
|
|
default=3306,
|
|
help='MySQL port number')
|
|
parser.add_option('-S', '--socket', type='string', dest='mysql_socket',
|
|
default=None,
|
|
help='MySQL socket path. Takes precedence over --port.')
|
|
parser.add_option('-m', '--move_back', action='store_true', dest='move_back',
|
|
default=False,
|
|
help='Moving MyRocks backup files to proper locations.')
|
|
parser.add_option('-r', '--rocksdb_datadir', type='string', dest='rocksdb_datadir',
|
|
default=None,
|
|
help='RocksDB target data directory where backup data files will be moved. Must be empty.')
|
|
parser.add_option('-w', '--rocksdb_waldir', type='string', dest='rocksdb_waldir',
|
|
default=None,
|
|
help='RocksDB target data directory where backup wal files will be moved. Must be empty.')
|
|
parser.add_option('-b', '--backup_dir', type='string', dest='backupdir',
|
|
default=None,
|
|
help='backup mode for WDT: Remote directory to store '
|
|
'backup. move_back mode: Locations where backup '
|
|
'files are stored.')
|
|
parser.add_option('-f', '--skip_check_frm_timestamp',
|
|
dest='skip_check_frm_timestamp',
|
|
action='store_true', default=False,
|
|
help='skipping to check if frm files are updated after starting backup.')
|
|
parser.add_option('-D', '--debug_signal_file', type='string', dest='debug_signal_file',
|
|
default=None,
|
|
help='debugging purpose: waiting until the specified file is created')
|
|
|
|
opts, args = parser.parse_args()
|
|
|
|
|
|
def create_moveback_dir(directory):
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
else:
|
|
for f in os.listdir(directory):
|
|
logger.error("Directory %s has file or directory %s!", directory, f)
|
|
raise
|
|
|
|
def print_move_back_usage():
|
|
logger.warning(move_back_usage)
|
|
|
|
def move_back():
|
|
if opts.rocksdb_datadir is None or opts.rocksdb_waldir is None or opts.backupdir is None or opts.datadir is None:
|
|
print_move_back_usage()
|
|
sys.exit()
|
|
create_moveback_dir(opts.datadir)
|
|
create_moveback_dir(opts.rocksdb_datadir)
|
|
create_moveback_dir(opts.rocksdb_waldir)
|
|
|
|
os.chdir(opts.backupdir)
|
|
for f in os.listdir(opts.backupdir):
|
|
if os.path.isfile(os.path.join(opts.backupdir,f)):
|
|
if f.endswith(rocksdb_wal_suffix):
|
|
shutil.move(f, opts.rocksdb_waldir)
|
|
elif f.endswith(rocksdb_data_suffix) or is_manifest(f):
|
|
shutil.move(f, opts.rocksdb_datadir)
|
|
else:
|
|
shutil.move(f, opts.datadir)
|
|
else: #directory
|
|
if f.endswith('.rocksdb'):
|
|
continue
|
|
shutil.move(f, opts.datadir)
|
|
|
|
def start_backup():
|
|
logger.info("Starting backup.")
|
|
runner = BackupRunner(opts.datadir)
|
|
b = None
|
|
backup_round= 1
|
|
while True:
|
|
b = runner.start_backup_round(backup_round, b)
|
|
backup_round = backup_round + 1
|
|
if b.finished is True:
|
|
b.print_backup_report()
|
|
logger.info("RocksDB Backup Done.")
|
|
break
|
|
if opts.debug_signal_file:
|
|
while not os.path.exists(opts.debug_signal_file):
|
|
logger.info("Waiting until %s is created..", opts.debug_signal_file)
|
|
time.sleep(1)
|
|
runner.backup_mysql()
|
|
logger.info("All Backups Done.")
|
|
|
|
|
|
def main():
|
|
parse_options()
|
|
init_logger()
|
|
|
|
if opts.move_back is True:
|
|
move_back()
|
|
elif opts.output_stream == 'wdt':
|
|
backup_using_wdt()
|
|
else:
|
|
start_backup()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|