Overview
What is parsync?
Parsync is a script that tries to run multiple rsync command in parallel to speed up the sync progress.
Why not rsync?
-
rsync
is a single process program, it cannot fully take the advantage of multi-cores CPU and high network band width. In short, it is slow. -
rsync
can be run with tools likefind
,parallel
andxargs
to run multiple rsync processes in parallel, e.g.find SOURCE_DIR -type f | parallel --will-cite -j 5 rsync -av {} DESTINATION
or
find SOURCE_DIR -type d | xargs -I {} rsync -a {} DESTINATION
In the first example,
parallel
will run 5rsync
processes to sync files underSOURCE_DIR
one by one. If those files are large files, it is fine. However, if those are small files like 4KB,rsync
will still be very slow.In the second example, one
rsync
process will be started for each directory underSOURCE_DIR
. Usually, all files are not evenly spread under these directories which means, at first,rsync
does run in parallel. But, soon, most ofrsync
processes will finish their work and exit, only the 1 or 2rsync
process will still be running. Then,rsync
will become slow again.
Usage
usage: parsync.py [-h] [--bwlimit BWLIMIT] [--id ID] [--parallel PARALLEL]
[--no-progress] [-n]
source destination ...
Run rsync in parallel. All parameters for parsync MUST be provided before
source as usage shows, any parameter after destination is directly passed to
rsync.
positional arguments:
source source of rsync, cannot be empty. DO NOT use syntax
like XXX/* for source, use XXX/ or "XXX/*" instead
destination destination of rsync, cannot be empty
args additional arguments that will directly be send to
rsync
optional arguments:
-h, --help show this help message and exit
--bwlimit BWLIMIT limit I/O bandwidth; KBytes per second. Default: 0 KB/s
--id ID ID of backup task. It is the pid of current process by
default
--parallel PARALLEL run how many rsync processes in parallel
--no-progress do not show progress bar
-n, --dry-run perform a trial run with no changes made
P.S. If you encounter encoding error, add LC_ALL=en_US.UTF-8
before parsync, e.g.
LC_ALL=en_US.UTF-8 python parsync.py SOURCE DEST
Example Usage
Sync SOURCE to DEST
parsync.py SOURCE DEST
Sync files and dirs under SOURCE to DEST
parsync.py SOURCE/ DEST
or
parsync.py "SOURCE/*" DEST
The reason adding "
around SOURCE/*
is to avoid SOURCE/*
being expanded into SOURCE/a SOURCE/b …
by shell.
Run 100 rsync in parallel
parsync.py --parallel=100 SOURCE DEST
Run parsync with bandwidth limit 4000KB/s with 100 rsync in parallel
parsync.py --parallel=100 --bwlimit=4000 SOURCE DEST
or
parsync.py --parallel=100 SOURCE DEST --bwlimit=40
In the first example, the bwlimit
is passed to parsync and parsync will calculate the bwlimit
for each rsync process, in this case, 40 KB/s. In the second example, the bwlimit
is directly passed to rsync without any check or change.
Test
Run python test.py
to run unittest for parsync. If you encouter encoding error, add LC_ALL=en_US.UTF-8
in the beginning of the command, for example: LC_ALL=en_US.UTF-8 python test.py
Script content
parsync.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function, division, absolute_import
import os
import sys
from time import sleep
import argparse
from itertools import chain
from subprocess import check_output, CalledProcessError, check_call
try:
from os import scandir
except ImportError:
from scandir import scandir
from utils import walk, Pool, lisfile
total_paths = 0
total_tasks = 0
show_progress = True
GROUP_CNT = 10
PATHS = '/tmp/parsync_paths.{}'
paths_file = None
TASKS = '/tmp/parsync_tasks.{}'
tasks_file = None
# this value is used to calculate how long should a progress bar be.
# screen width in IPMI is 100, this value should be smaller than 100,
# so that the progress bar will not be longer than one line
SCREEN_WIDTH = 100
def set_screen_width():
try:
output = check_output(['stty', 'size'])
except CalledProcessError:
pass # use default value 30
else:
global SCREEN_WIDTH
try:
SCREEN_WIDTH = int(output.split()[1])
except:
pass
def generate_paths(source='.'):
if show_progress:
dot_num = 0
path_count = 0
max_dots = SCREEN_WIDTH - len('Generate Paths ')
sys.stdout.write('Generate Paths ' + ' ' * max_dots)
with open(paths_file, 'wb', buffering=4194304) as writer:
if lisfile(source):
writer.write(source + '\n')
else:
for root, dirs, files in walk(source):
if root != os.path.curdir: # skip '.'
writer.write(root + os.path.sep + '\n')
for f in files:
# DO NOT use str.format() here, it may raise encoding error
writer.write(os.path.join(root, f) + '\n')
if show_progress:
path_count += 1
if path_count == 1000:
if dot_num == max_dots:
dot_num = 0
sys.stdout.write('\b' * max_dots)
dot_num += 1
sys.stdout.write('.' * dot_num + ' ' * (max_dots - dot_num))
sys.stdout.flush()
path_count = 0
if show_progress:
print('')
def generate_groups():
"""
I've tried count line number of a file with 13169031 lines
1. read file line by line, until the end avg 2.3067 s
2. read N MB file into buffer, count number of '\n', until the end avg 1.8454 s
3. open file and load it into mmap(memory map), count line by line avg 3.0107 s
4. use "wc -l" command avg 1.0206 s
"wc -l" is the fastest way to count line number of a file..
"""
try:
output = check_output(['wc', '-l', '{}'.format(paths_file)])
except CalledProcessError as e:
print('wc failed to get line count, error:\n{}'.format(e))
raise
global total_paths
total_paths = int(output.split()[0])
line_per_group = (total_paths + GROUP_CNT - 1) // GROUP_CNT
for group in range(GROUP_CNT):
begin = line_per_group * group
end = begin + line_per_group
yield begin, end
def _generate_tasks_with_progress(group_paths, task_done=None):
tasks = []
top_dir = None
for path in group_paths:
# remove tailing '\n', do not use str.strip(), because path may end with spaces
path = path[:-1]
if path.endswith(os.path.sep): # is a dir
# Because rsync is used with '-R', so it's unnecessary to remove tailing '/'
# path = path[:-1] # remove tailing os.path.sep
if top_dir and path.startswith(top_dir):
task_done.value += 1
continue
if group_paths[-1].startswith(path):
# group may not contain path's all subfolder and files
if scandir(path): # path is not an empty folder
task_done.value += 1
# it will be synced when syncing folders and files it contains
continue
else: # path is an empty folder
task_done.value += 1
tasks.append(path)
else:
task_done.value += 1
# all path's subfolders and files are in group
top_dir = path
tasks.append(path)
else: # not a dir
if top_dir and path.startswith(top_dir):
task_done.value += 1
# path is under top_dir, ignore it
# it will be synced, when top_dir is synced
continue
else:
task_done.value += 1
tasks.append(path)
return tasks
def _generate_tasks_without_progress(group_paths, task_done=None):
tasks = []
top_dir = None
for path in group_paths:
# remove tailing '\n', do not use str.strip(), because path may end with spaces
path = path[:-1]
if path.endswith(os.path.sep): # is a dir
# Because rsync is used with '-R', so it's unnecessary to remove tailing '/'
# path = path[:-1] # remove tailing '/'
if top_dir and path.startswith(top_dir):
continue
if group_paths[-1].startswith(path):
# group may not contain path's all subfolder and files
if scandir(path): # path is not an empty folder
# it will be synced when syncing folders and files it contains
continue
else: # path is an empty folder
tasks.append(path)
else:
# all path's subfolders and files are in group
top_dir = path
tasks.append(path)
else: # not a dir
if top_dir and path.startswith(top_dir):
# path is under top_dir, ignore it
# it will be synced, when top_dir is synced
continue
else:
tasks.append(path)
return tasks
def generate_tasks(*args, **kwargs):
if show_progress:
return _generate_tasks_with_progress(*args, **kwargs)
else:
return _generate_tasks_without_progress(*args, **kwargs)
def do_rsync(source, destination, args=None, bwlimit=None, task_done=None):
if source:
cmd = ['rsync', '-avqsR', '{}'.format(source), '{}'.format(destination)]
cmd.extend(args)
if bwlimit:
cmd.append('--bwlimit={}'.format(bwlimit))
try:
check_call(cmd)
except CalledProcessError as e:
print('rsync failed, error:\n{}'.format(e))
# this task is done, no matter if it is successful or not
if show_progress:
task_done.value += 1
def main():
parser = argparse.ArgumentParser(
description='Run rsync in parallel. '
'All parameters for parsync MUST be provided before source as usage shows, '
'any parameter after destination is directly passed to rsync.'
)
parser.add_argument('source',
help='source of rsync, cannot be empty. DO NOT use syntax '
'like XXX/* for source, use XXX/ or "XXX/*" instead')
parser.add_argument('destination', help='destination of rsync, cannot be empty')
parser.add_argument('--bwlimit', type=int, default=0,
help='limit I/O bandwidth; KBytes per second. Default: 0 KB/s')
parser.add_argument('--id', default=str(os.getpid()),
help='ID of backup task. It is the pid of current process by default')
parser.add_argument('--parallel', type=int, default=4,
help='run how many rsync processes in parallel')
parser.add_argument('--no-progress', action='store_true', help='do not show progress bar')
parser.add_argument('-n', '--dry-run', action='store_true',
help='perform a trial run with no changes made')
parser.add_argument('args', nargs=argparse.REMAINDER,
help="additional arguments that will directly be send to rsync")
options = parser.parse_args()
# do not use strip(), source and destination may contain leading or tailing spaces
source = options.source
destination = os.path.abspath(options.destination)
tid = options.id
assert options.parallel > 0, 'parallel parameter must be greater than 0'
# bwlimit parameter for rsync must be an integer
bwlimit = int(options.bwlimit / options.parallel) or None
args = options.args
global show_progress, paths_file, tasks_file, GROUP_CNT
show_progress = not options.no_progress
paths_file = PATHS.format(tid)
tasks_file = TASKS.format(tid)
if options.parallel < 10:
GROUP_CNT = 10
elif options.parallel > 100:
GROUP_CNT = 100
else:
GROUP_CNT = options.parallel
if show_progress:
set_screen_width()
# remove tailing '*'
source = source[:-1] if source.endswith('*') else source
# sync root folder of source to destination or not
sync_root = not source.endswith(os.path.sep)
# remove tailing '/', e.g. 'XXX/YYY/' ==> 'XXX/YYY'
source = os.path.normpath(source)
# AAA/BBB/CCC ==> AAA/BBB, CCC
base_dir, source = os.path.split(source)
if base_dir:
os.chdir(base_dir)
if not sync_root:
os.chdir(source)
source = os.path.curdir
if not os.path.exists(paths_file):
generate_paths(source)
pool = Pool(processes=options.parallel, show_progress=show_progress)
if not os.path.exists(tasks_file):
with open(paths_file, 'rb') as reader:
paths = reader.readlines()
result = []
for start, end in generate_groups():
result.append(pool.apply_async(generate_tasks, (paths[start:end],)))
if show_progress:
max_dots = SCREEN_WIDTH - len('Generating Tasks ') - len(' 100.0%')
while not all([r.ready() for r in result]):
try:
current_progress = pool.task_done / total_paths
except ZeroDivisionError:
break
if current_progress > 1:
current_progress = 1
dot_num = int(current_progress * max_dots)
percentage = '{:>6.1f}%'.format(current_progress * 100)
progress_bar = 'Generating Tasks ' + '.' * dot_num + ' ' * (max_dots - dot_num) + percentage
sys.stdout.write('\b' * SCREEN_WIDTH)
sys.stdout.write(progress_bar)
sys.stdout.flush()
sleep(0.1)
else:
progress_bar = 'Generating Tasks ' + '.' * max_dots + ' 100.0%'
sys.stdout.write('\b' * SCREEN_WIDTH)
sys.stdout.write(progress_bar)
sys.stdout.flush()
print('')
tasks = chain.from_iterable([r.get() for r in result])
global total_tasks
with open(tasks_file, 'wb', buffering=4194304) as writer:
for t in tasks:
writer.write(t + '\n')
total_tasks += 1
if not options.dry_run:
pool = Pool(processes=options.parallel, show_progress=show_progress)
try:
results = []
with open(tasks_file, 'rb') as reader:
for task in reader:
# task[:-1] is to remove tailing '\n'
result = pool.apply_async(do_rsync, (task[:-1], destination, args, bwlimit))
results.append(result)
if show_progress:
max_dots = SCREEN_WIDTH - len('Syncing ') - len(' 100.0%')
while results:
current_progress = pool.task_done / total_tasks
if current_progress > 1:
current_progress = 1
dot_num = int(current_progress * max_dots)
percentage = '{:>6.1f}%'.format(current_progress * 100)
progress_bar = 'Syncing ' + '.' * dot_num + ' ' * (max_dots - dot_num) + percentage
sys.stdout.write('\b' * SCREEN_WIDTH)
sys.stdout.write(progress_bar)
sys.stdout.flush()
sleep(0.1)
results = filter(lambda x: not x.ready(), results)
else:
progress_bar = 'Syncing ' + '.' * max_dots + ' 100.0%'
sys.stdout.write('\b' * SCREEN_WIDTH)
sys.stdout.write(progress_bar)
sys.stdout.flush()
print('')
except KeyboardInterrupt:
pool.terminate()
else:
pool.close()
finally:
pool.join()
return 0
if __name__ == '__main__':
main()
utils.py
# -*- coding: utf-8 -*-
import os
import six
import stat
from multiprocessing import Value
from multiprocessing.pool import Pool
from multiprocessing.util import debug
try:
from os import scandir
except ImportError:
from scandir import scandir
__all__ = ['walk', 'Pool', 'lisfile']
def ensure_unicode(obj):
return obj if isinstance(obj, six.text_type) else obj.decode('utf8')
def ensure_bytes(obj):
if isinstance(obj, six.text_type):
return obj.encode('utf8')
elif isinstance(obj, bytearray):
return bytes(obj)
else:
return obj
def walk(top, onerror=None):
"""A simplified version of scandir.walk().
It is equals to scandir.walk() with topdown=True, followlinks=False
"""
dirs = []
nondirs = []
try:
scandir_it = scandir(top)
except OSError as error:
if onerror is not None:
onerror(error)
return
while True:
try:
try:
entry = next(scandir_it)
except StopIteration:
break
except OSError as error:
if onerror is not None:
onerror(error)
return
try:
is_dir = entry.is_dir(False)
except OSError:
is_dir = False
if is_dir:
dirs.append(entry.name)
else:
nondirs.append(entry.name)
# Yield before recursion if going top down
yield top, dirs, nondirs
# Recurse into sub-directories
for name in dirs:
new_path = os.path.join(top, name)
for entry in walk(new_path, onerror):
yield entry
def lisfile(path):
# check if a given path is file or not(do not follow symlinks)
mode = os.stat(path).st_mode
if stat.S_ISLNK(mode):
return True
else:
return not stat.S_ISDIR(mode)
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, task_done=None):
assert maxtasks is None or (type(maxtasks) in (six.integer_types,) and maxtasks > 0)
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, task_done=task_done, **kwds))
except Exception as e:
result = (False, e)
try:
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
task = job = result = func = args = kwds = None
completed += 1
debug('worker exiting after %d tasks' % completed)
class PoolWithProgress(Pool):
def __init__(self, *args, **kwargs):
self.task_summary = []
self.task_done_of_exited_workers = 0
super(PoolWithProgress, self).__init__(*args, **kwargs)
@property
def task_done(self):
return self.task_done_of_exited_workers + sum((obj.value for obj in self.task_summary))
def _join_exited_workers(self):
"""Cleanup after any worker processes which have exited due to reaching
their specified lifetime. Returns True if any workers were cleaned up.
"""
cleaned = False
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del self._pool[i]
self.task_done_of_exited_workers += self.task_summary[i].value
del self.task_summary[i]
return cleaned
def _repopulate_pool(self):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(self._processes - len(self._pool)):
task_done = Value('L', 0)
w = self.Process(target=worker,
args=(self._inqueue, self._outqueue,
self._initializer, self._initargs,
self._maxtasksperchild, task_done)
)
self.task_summary.append(task_done)
self._pool.append(w)
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
debug('added worker')
class MaybeEncodingError(Exception):
"""Wraps possible unpickleable errors, so they can be
safely sent through the socket."""
def __init__(self, exc, value):
self.exc = repr(exc)
self.value = repr(value)
super(MaybeEncodingError, self).__init__(self.exc, self.value)
def __str__(self):
return "Error sending result: '%s'. Reason: '%s'" % (self.value,
self.exc)
def __repr__(self):
return "<MaybeEncodingError: %s>" % str(self)
def Pool(show_progress=True, *args, **kwargs):
if show_progress:
return PoolWithProgress(*args, **kwargs)
else:
return Pool(*args, **kwargs)
test.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import os
import six
import shutil
import unittest
import subprocess
try:
from itertools import izip_longest as zip_longest
except ImportError:
from itertools import zip_longest
from functools import wraps
TEST = os.path.normpath(os.path.join(__file__, os.path.pardir, 'test'))
SOURCE = os.path.join(TEST, 'source')
PS_DEST = os.path.join(TEST, 'parsync_dest')
RS_DEST = os.path.join(TEST, 'rsync_dest')
names = {
'file': {
'normal': 'file',
'chinese': '文本文件',
'space': ' regular file with spaces ',
'*': 'file*',
'|': 'file|',
'=': 'file=',
'?': 'file?',
'"': '"file"',
'unicode': 'fileåß∂ƒ©∆¬',
'spanish': 'fileáéíóú¿¡üñ',
},
'dir': {
'normal': 'dir',
'chinese': '文件夹',
'space': ' dir with spaces ',
'*': 'dir*',
'|': 'dir|',
'=': 'dir=',
'?': 'dir?',
'"': '"dir"',
'unicode': 'diråß∂ƒ©∆¬',
'spanish': 'diráéíóú¿¡üñ',
}
}
def to_bytes(obj):
if six.PY3:
return obj
if isinstance(obj, six.text_type):
return obj.encode('utf8')
elif isinstance(obj, bytearray):
return bytes(obj)
else:
return obj
def ensure_bytes(func):
@wraps(func)
def wrapper(*args, **kwargs):
new_args = (to_bytes(arg) for arg in args)
new_kwargs = {k: to_bytes(v) for k, v in kwargs.items()}
return func(*new_args, **new_kwargs)
return wrapper
@ensure_bytes
def touch(name, parent=os.path.curdir):
path = os.path.join(parent, name)
try:
open(path, 'w').close()
except Exception as e:
print('Create file [{}] failed, error:\n{}'.format(path, e))
@ensure_bytes
def create_files(parent=os.path.curdir):
for name in names['file'].values():
touch(name, parent=parent)
@ensure_bytes
def create_dirs(parent=os.path.curdir):
for name in names['dir'].values():
path = os.path.join(parent, name)
os.mkdir(path)
touch('example', path)
def create_soft_link():
cwd = os.getcwd()
os.chdir(os.path.join(os.path.dirname(__file__), SOURCE))
os.symlink('dir', 'link_to_dir')
os.symlink('file', 'link_to_file')
touch('source')
os.symlink('source', 'broken_link')
os.remove('source')
os.chdir(cwd)
class TestSyncSource(unittest.TestCase):
@classmethod
def setUpClass(cls):
if os.path.exists(TEST):
shutil.rmtree(TEST)
# create souce and destination dirs for testing
os.mkdir(TEST)
os.mkdir(SOURCE)
# create source files and dirs
create_files(SOURCE)
create_dirs(SOURCE)
create_soft_link()
@classmethod
def tearDownClass(cls):
shutil.rmtree(TEST)
def setUp(self):
os.mkdir(PS_DEST)
os.mkdir(RS_DEST)
def tearDown(self):
shutil.rmtree(RS_DEST)
shutil.rmtree(PS_DEST)
def base_test(self, source, parallel=None, bwlimit=None, no_progress=True):
if six.PY2:
cmd = ['python2', 'parsync.py', source, PS_DEST]
else:
cmd = ['python3', 'parsync.py', source, PS_DEST]
if no_progress:
cmd.insert(2, '--no-progress')
if parallel:
cmd.insert(2, '--parallel')
cmd.insert(3, str(parallel))
if bwlimit:
cmd.insert(2, '--bwlimit')
cmd.insert(3, str(bwlimit))
subprocess.check_call(cmd)
subprocess.check_call(['rsync', '-aqs', source, RS_DEST])
parsync_dir_tree = os.walk(PS_DEST)
rsync_dir_tree = os.walk(RS_DEST)
for ps, rs in zip_longest(parsync_dir_tree, rsync_dir_tree):
if ps and rs:
_, dirs1, files1 = ps
_, dirs2, files2 = rs
self.assertEqual(sorted(dirs1), sorted(dirs2))
self.assertEqual(sorted(files1), sorted(files2))
else:
self.assertEqual(ps, rs)
break
def test_sync_source_dir_with_progress(self):
self.base_test(SOURCE, no_progress=False)
def test_sync_files_under_source_dir(self):
self.base_test('{}/'.format(SOURCE))
def test_parsync_with_parallel(self):
self.base_test(SOURCE, parallel=8)
def test_parsync_with_parallel_1(self):
self.base_test(SOURCE, parallel=1)
def test_parsync_with_bwlimit(self):
self.base_test(SOURCE, bwlimit=10)
def test_sync_single_file(self):
self.base_test(os.path.join(SOURCE, names['file']['normal']))
def test_sync_file_with_Chinese_name(self):
self.base_test(os.path.join(SOURCE, names['file']['chinese']))
@unittest.expectedFailure
def test_source_not_exist(self):
self.base_test('not_exist')
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(TestSyncSource)
result = unittest.TextTestRunner(verbosity=2).run(suite)
exit(not result.wasSuccessful())