玩命加载中 . . .

Parsync


Overview

What is parsync?

Parsync is a script that tries to run multiple rsync command in parallel to speed up the sync progress.

Why not rsync?

  • rsync is a single process program, it cannot fully take the advantage of multi-cores CPU and high network band width. In short, it is slow.

  • rsync can be run with tools like find, parallel and xargs to run multiple rsync processes in parallel, e.g.

    find SOURCE_DIR -type f | parallel --will-cite -j 5 rsync -av {} DESTINATION
    

    or

    find SOURCE_DIR -type d | xargs -I {} rsync -a {} DESTINATION
    

    In the first example, parallel will run 5 rsync processes to sync files under SOURCE_DIR one by one. If those files are large files, it is fine. However, if those are small files like 4KB, rsync will still be very slow.

    In the second example, one rsync process will be started for each directory under SOURCE_DIR. Usually, all files are not evenly spread under these directories which means, at first, rsync does run in parallel. But, soon, most of rsync processes will finish their work and exit, only the 1 or 2 rsync process will still be running. Then, rsync will become slow again.

Usage

usage: parsync.py [-h] [--bwlimit BWLIMIT] [--id ID] [--parallel PARALLEL]
                  [--no-progress] [-n]
                  source destination ...

Run rsync in parallel. All parameters for parsync MUST be provided before
source as usage shows, any parameter after destination is directly passed to
rsync.

positional arguments:
  source               source of rsync, cannot be empty. DO NOT use syntax
                       like XXX/* for source, use XXX/ or "XXX/*" instead
  destination          destination of rsync, cannot be empty
  args                 additional arguments that will directly be send to
                       rsync

optional arguments:
  -h, --help           show this help message and exit
  --bwlimit BWLIMIT    limit I/O bandwidth; KBytes per second. Default: 0 KB/s
  --id ID              ID of backup task. It is the pid of current process by
                       default
  --parallel PARALLEL  run how many rsync processes in parallel
  --no-progress        do not show progress bar
  -n, --dry-run        perform a trial run with no changes made

P.S. If you encounter encoding error, add LC_ALL=en_US.UTF-8 before parsync, e.g.

LC_ALL=en_US.UTF-8 python parsync.py SOURCE DEST

Example Usage

Sync SOURCE to DEST

parsync.py SOURCE DEST

Sync files and dirs under SOURCE to DEST

parsync.py SOURCE/ DEST

or

parsync.py "SOURCE/*" DEST

The reason adding " around SOURCE/* is to avoid SOURCE/* being expanded into SOURCE/a SOURCE/b … by shell.

Run 100 rsync in parallel

parsync.py --parallel=100 SOURCE DEST

Run parsync with bandwidth limit 4000KB/s with 100 rsync in parallel

parsync.py --parallel=100 --bwlimit=4000 SOURCE DEST

or

parsync.py --parallel=100 SOURCE DEST --bwlimit=40

In the first example, the bwlimit is passed to parsync and parsync will calculate the bwlimit for each rsync process, in this case, 40 KB/s. In the second example, the bwlimit is directly passed to rsync without any check or change.

Test

Run python test.py to run unittest for parsync. If you encouter encoding error, add LC_ALL=en_US.UTF-8 in the beginning of the command, for example: LC_ALL=en_US.UTF-8 python test.py

Script content

parsync.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function, division, absolute_import

import os
import sys
from time import sleep
import argparse
from itertools import chain
from subprocess import check_output, CalledProcessError, check_call

try:
    from os import scandir
except ImportError:
    from scandir import scandir

from utils import walk, Pool, lisfile


total_paths = 0
total_tasks = 0
show_progress = True
GROUP_CNT = 10
PATHS = '/tmp/parsync_paths.{}'
paths_file = None
TASKS = '/tmp/parsync_tasks.{}'
tasks_file = None

# this value is used to calculate how long should a progress bar be.
# screen width in IPMI is 100, this value should be smaller than 100,
# so that the progress bar will not be longer than one line
SCREEN_WIDTH = 100


def set_screen_width():
    try:
        output = check_output(['stty', 'size'])
    except CalledProcessError:
        pass    # use default value 30
    else:
        global SCREEN_WIDTH
        try:
            SCREEN_WIDTH = int(output.split()[1])
        except:
            pass


def generate_paths(source='.'):
    if show_progress:
        dot_num = 0
        path_count = 0
        max_dots = SCREEN_WIDTH - len('Generate Paths ')
        sys.stdout.write('Generate Paths ' + ' ' * max_dots)

    with open(paths_file, 'wb', buffering=4194304) as writer:
        if lisfile(source):
            writer.write(source + '\n')
        else:
            for root, dirs, files in walk(source):
                if root != os.path.curdir:  # skip '.'
                    writer.write(root + os.path.sep + '\n')
                for f in files:
                    # DO NOT use str.format() here, it may raise encoding error
                    writer.write(os.path.join(root, f) + '\n')

                if show_progress:
                    path_count += 1
                    if path_count == 1000:
                        if dot_num == max_dots:
                            dot_num = 0
                        sys.stdout.write('\b' * max_dots)
                        dot_num += 1
                        sys.stdout.write('.' * dot_num + ' ' * (max_dots - dot_num))
                        sys.stdout.flush()
                        path_count = 0
    if show_progress:
        print('')


def generate_groups():
    """
    I've tried count line number of a file with 13169031 lines
    1. read file line by line, until the end                            avg 2.3067 s
    2. read N MB file into buffer, count number of '\n', until the end  avg 1.8454 s
    3. open file and load it into mmap(memory map), count line by line  avg 3.0107 s
    4. use "wc -l" command                                              avg 1.0206 s

    "wc -l" is the fastest way to count line number of a file..
    """
    try:
        output = check_output(['wc', '-l', '{}'.format(paths_file)])
    except CalledProcessError as e:
        print('wc failed to get line count, error:\n{}'.format(e))
        raise

    global total_paths
    total_paths = int(output.split()[0])
    line_per_group = (total_paths + GROUP_CNT - 1) // GROUP_CNT

    for group in range(GROUP_CNT):
        begin = line_per_group * group
        end = begin + line_per_group
        yield begin, end


def _generate_tasks_with_progress(group_paths, task_done=None):
    tasks = []
    top_dir = None
    for path in group_paths:
        # remove tailing '\n', do not use str.strip(), because path may end with spaces
        path = path[:-1]
        if path.endswith(os.path.sep):  # is a dir
            # Because rsync is used with '-R', so it's unnecessary to remove tailing '/'
            # path = path[:-1]    # remove tailing os.path.sep
            if top_dir and path.startswith(top_dir):
                task_done.value += 1
                continue

            if group_paths[-1].startswith(path):
                # group may not contain path's all subfolder and files
                if scandir(path):   # path is not an empty folder
                    task_done.value += 1
                    # it will be synced when syncing folders and files it contains
                    continue
                else:               # path is an empty folder
                    task_done.value += 1
                    tasks.append(path)
            else:
                task_done.value += 1
                # all path's subfolders and files are in group
                top_dir = path
                tasks.append(path)
        else:   # not a dir
            if top_dir and path.startswith(top_dir):
                task_done.value += 1
                # path is under top_dir, ignore it
                # it will be synced, when top_dir is synced
                continue
            else:
                task_done.value += 1
                tasks.append(path)
    return tasks


def _generate_tasks_without_progress(group_paths, task_done=None):
    tasks = []
    top_dir = None
    for path in group_paths:
        # remove tailing '\n', do not use str.strip(), because path may end with spaces
        path = path[:-1]
        if path.endswith(os.path.sep):  # is a dir
            # Because rsync is used with '-R', so it's unnecessary to remove tailing '/'
            # path = path[:-1]    # remove tailing '/'
            if top_dir and path.startswith(top_dir):
                continue

            if group_paths[-1].startswith(path):
                # group may not contain path's all subfolder and files
                if scandir(path):   # path is not an empty folder
                    # it will be synced when syncing folders and files it contains
                    continue
                else:               # path is an empty folder
                    tasks.append(path)
            else:
                # all path's subfolders and files are in group
                top_dir = path
                tasks.append(path)
        else:   # not a dir
            if top_dir and path.startswith(top_dir):
                # path is under top_dir, ignore it
                # it will be synced, when top_dir is synced
                continue
            else:
                tasks.append(path)
    return tasks


def generate_tasks(*args, **kwargs):
    if show_progress:
        return _generate_tasks_with_progress(*args, **kwargs)
    else:
        return _generate_tasks_without_progress(*args, **kwargs)


def do_rsync(source, destination, args=None, bwlimit=None, task_done=None):
    if source:
        cmd = ['rsync', '-avqsR', '{}'.format(source), '{}'.format(destination)]
        cmd.extend(args)
        if bwlimit:
            cmd.append('--bwlimit={}'.format(bwlimit))

        try:
            check_call(cmd)
        except CalledProcessError as e:
            print('rsync failed, error:\n{}'.format(e))

        # this task is done, no matter if it is successful or not
        if show_progress:
            task_done.value += 1


def main():
    parser = argparse.ArgumentParser(
        description='Run rsync in parallel. '
                    'All parameters for parsync MUST be provided before source as usage shows, '
                    'any parameter after destination is directly passed to rsync.'
    )
    parser.add_argument('source',
                        help='source of rsync, cannot be empty. DO NOT use syntax '
                             'like XXX/* for source, use XXX/ or "XXX/*" instead')
    parser.add_argument('destination', help='destination of rsync, cannot be empty')
    parser.add_argument('--bwlimit', type=int, default=0,
                        help='limit I/O bandwidth; KBytes per second. Default: 0 KB/s')
    parser.add_argument('--id', default=str(os.getpid()),
                        help='ID of backup task. It is the pid of current process by default')
    parser.add_argument('--parallel', type=int, default=4,
                        help='run how many rsync processes in parallel')
    parser.add_argument('--no-progress', action='store_true', help='do not show progress bar')
    parser.add_argument('-n', '--dry-run', action='store_true',
                        help='perform a trial run with no changes made')
    parser.add_argument('args', nargs=argparse.REMAINDER,
                        help="additional arguments that will directly be send to rsync")

    options = parser.parse_args()

    # do not use strip(), source and destination may contain leading or tailing spaces
    source = options.source
    destination = os.path.abspath(options.destination)
    tid = options.id
    assert options.parallel > 0, 'parallel parameter must be greater than 0'
    # bwlimit parameter for rsync must be an integer
    bwlimit = int(options.bwlimit / options.parallel) or None
    args = options.args

    global show_progress, paths_file, tasks_file, GROUP_CNT
    show_progress = not options.no_progress
    paths_file = PATHS.format(tid)
    tasks_file = TASKS.format(tid)
    if options.parallel < 10:
        GROUP_CNT = 10
    elif options.parallel > 100:
        GROUP_CNT = 100
    else:
        GROUP_CNT = options.parallel

    if show_progress:
        set_screen_width()

    # remove tailing '*'
    source = source[:-1] if source.endswith('*') else source

    # sync root folder of source to destination or not
    sync_root = not source.endswith(os.path.sep)

    # remove tailing '/', e.g. 'XXX/YYY/' ==> 'XXX/YYY'
    source = os.path.normpath(source)
    # AAA/BBB/CCC ==> AAA/BBB, CCC
    base_dir, source = os.path.split(source)
    if base_dir:
        os.chdir(base_dir)

    if not sync_root:
        os.chdir(source)
        source = os.path.curdir

    if not os.path.exists(paths_file):
        generate_paths(source)

    pool = Pool(processes=options.parallel, show_progress=show_progress)
    if not os.path.exists(tasks_file):
        with open(paths_file, 'rb') as reader:
            paths = reader.readlines()

        result = []
        for start, end in generate_groups():
            result.append(pool.apply_async(generate_tasks, (paths[start:end],)))

        if show_progress:
            max_dots = SCREEN_WIDTH - len('Generating Tasks ') - len(' 100.0%')
            while not all([r.ready() for r in result]):
                try:
                    current_progress = pool.task_done / total_paths
                except ZeroDivisionError:
                    break
                if current_progress > 1:
                    current_progress = 1
                dot_num = int(current_progress * max_dots)
                percentage = '{:>6.1f}%'.format(current_progress * 100)
                progress_bar = 'Generating Tasks ' + '.' * dot_num + ' ' * (max_dots - dot_num) + percentage
                sys.stdout.write('\b' * SCREEN_WIDTH)
                sys.stdout.write(progress_bar)
                sys.stdout.flush()
                sleep(0.1)
            else:
                progress_bar = 'Generating Tasks ' + '.' * max_dots + ' 100.0%'
                sys.stdout.write('\b' * SCREEN_WIDTH)
                sys.stdout.write(progress_bar)
                sys.stdout.flush()

            print('')

        tasks = chain.from_iterable([r.get() for r in result])
        global total_tasks
        with open(tasks_file, 'wb', buffering=4194304) as writer:
            for t in tasks:
                writer.write(t + '\n')
                total_tasks += 1

    if not options.dry_run:
        pool = Pool(processes=options.parallel, show_progress=show_progress)
        try:
            results = []
            with open(tasks_file, 'rb') as reader:
                for task in reader:
                    # task[:-1] is to remove tailing '\n'
                    result = pool.apply_async(do_rsync, (task[:-1], destination, args, bwlimit))
                    results.append(result)

            if show_progress:
                max_dots = SCREEN_WIDTH - len('Syncing ') - len(' 100.0%')
                while results:
                    current_progress = pool.task_done / total_tasks
                    if current_progress > 1:
                        current_progress = 1
                    dot_num = int(current_progress * max_dots)
                    percentage = '{:>6.1f}%'.format(current_progress * 100)
                    progress_bar = 'Syncing ' + '.' * dot_num + ' ' * (max_dots - dot_num) + percentage
                    sys.stdout.write('\b' * SCREEN_WIDTH)
                    sys.stdout.write(progress_bar)
                    sys.stdout.flush()
                    sleep(0.1)

                    results = filter(lambda x: not x.ready(), results)
                else:
                    progress_bar = 'Syncing ' + '.' * max_dots + ' 100.0%'
                    sys.stdout.write('\b' * SCREEN_WIDTH)
                    sys.stdout.write(progress_bar)
                    sys.stdout.flush()
                print('')

        except KeyboardInterrupt:
            pool.terminate()
        else:
            pool.close()
        finally:
            pool.join()

    return 0


if __name__ == '__main__':
    main()

utils.py

# -*- coding: utf-8 -*-

import os
import six
import stat
from multiprocessing import Value
from multiprocessing.pool import Pool
from multiprocessing.util import debug

try:
    from os import scandir
except ImportError:
    from scandir import scandir


__all__ = ['walk', 'Pool', 'lisfile']


def ensure_unicode(obj):
    return obj if isinstance(obj, six.text_type) else obj.decode('utf8')


def ensure_bytes(obj):
    if isinstance(obj, six.text_type):
        return obj.encode('utf8')
    elif isinstance(obj, bytearray):
        return bytes(obj)
    else:
        return obj


def walk(top, onerror=None):
    """A simplified version of scandir.walk().
    It is equals to scandir.walk() with topdown=True, followlinks=False
    """
    dirs = []
    nondirs = []

    try:
        scandir_it = scandir(top)
    except OSError as error:
        if onerror is not None:
            onerror(error)
        return

    while True:
        try:
            try:
                entry = next(scandir_it)
            except StopIteration:
                break
        except OSError as error:
            if onerror is not None:
                onerror(error)
            return

        try:
            is_dir = entry.is_dir(False)
        except OSError:
            is_dir = False

        if is_dir:
            dirs.append(entry.name)
        else:
            nondirs.append(entry.name)

    # Yield before recursion if going top down
    yield top, dirs, nondirs

    # Recurse into sub-directories
    for name in dirs:
        new_path = os.path.join(top, name)
        for entry in walk(new_path, onerror):
            yield entry


def lisfile(path):
    # check if a given path is file or not(do not follow symlinks)
    mode = os.stat(path).st_mode
    if stat.S_ISLNK(mode):
        return True
    else:
        return not stat.S_ISDIR(mode)


def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, task_done=None):
    assert maxtasks is None or (type(maxtasks) in (six.integer_types,) and maxtasks > 0)
    put = outqueue.put
    get = inqueue.get
    if hasattr(inqueue, '_writer'):
        inqueue._writer.close()
        outqueue._reader.close()

    if initializer is not None:
        initializer(*initargs)

    completed = 0
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()
        except (EOFError, IOError):
            debug('worker got EOFError or IOError -- exiting')
            break

        if task is None:
            debug('worker got sentinel -- exiting')
            break

        job, i, func, args, kwds = task
        try:
            result = (True, func(*args, task_done=task_done, **kwds))
        except Exception as e:
            result = (False, e)
        try:
            put((job, i, result))
        except Exception as e:
            wrapped = MaybeEncodingError(e, result[1])
            debug("Possible encoding error while sending result: %s" % (
                wrapped))
            put((job, i, (False, wrapped)))

        task = job = result = func = args = kwds = None
        completed += 1
    debug('worker exiting after %d tasks' % completed)


class PoolWithProgress(Pool):
    def __init__(self, *args, **kwargs):
        self.task_summary = []
        self.task_done_of_exited_workers = 0
        super(PoolWithProgress, self).__init__(*args, **kwargs)

    @property
    def task_done(self):
        return self.task_done_of_exited_workers + sum((obj.value for obj in self.task_summary))

    def _join_exited_workers(self):
        """Cleanup after any worker processes which have exited due to reaching
        their specified lifetime.  Returns True if any workers were cleaned up.
        """
        cleaned = False
        for i in reversed(range(len(self._pool))):
            worker = self._pool[i]
            if worker.exitcode is not None:
                # worker exited
                debug('cleaning up worker %d' % i)
                worker.join()
                cleaned = True
                del self._pool[i]
                self.task_done_of_exited_workers += self.task_summary[i].value
                del self.task_summary[i]
        return cleaned

    def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            task_done = Value('L', 0)
            w = self.Process(target=worker,
                             args=(self._inqueue, self._outqueue,
                                   self._initializer, self._initargs,
                                   self._maxtasksperchild, task_done)
                             )
            self.task_summary.append(task_done)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            debug('added worker')


class MaybeEncodingError(Exception):
    """Wraps possible unpickleable errors, so they can be
    safely sent through the socket."""

    def __init__(self, exc, value):
        self.exc = repr(exc)
        self.value = repr(value)
        super(MaybeEncodingError, self).__init__(self.exc, self.value)

    def __str__(self):
        return "Error sending result: '%s'. Reason: '%s'" % (self.value,
                                                             self.exc)

    def __repr__(self):
        return "<MaybeEncodingError: %s>" % str(self)


def Pool(show_progress=True, *args, **kwargs):
    if show_progress:
        return PoolWithProgress(*args, **kwargs)
    else:
        return Pool(*args, **kwargs)

test.py

# -*- coding: utf-8 -*-

from __future__ import absolute_import, print_function

import os
import six
import shutil
import unittest
import subprocess

try:
    from itertools import izip_longest as zip_longest
except ImportError:
    from itertools import zip_longest
from functools import wraps


TEST = os.path.normpath(os.path.join(__file__, os.path.pardir, 'test'))
SOURCE = os.path.join(TEST, 'source')
PS_DEST = os.path.join(TEST, 'parsync_dest')
RS_DEST = os.path.join(TEST, 'rsync_dest')

names = {
    'file': {
        'normal': 'file',
        'chinese': '文本文件',
        'space': ' regular file with spaces ',
        '*': 'file*',
        '|': 'file|',
        '=': 'file=',
        '?': 'file?',
        '"': '"file"',
        'unicode': 'fileåß∂ƒ©∆¬',
        'spanish': 'fileáéíóú¿¡üñ',
    },
    'dir': {
        'normal': 'dir',
        'chinese': '文件夹',
        'space': ' dir with spaces ',
        '*': 'dir*',
        '|': 'dir|',
        '=': 'dir=',
        '?': 'dir?',
        '"': '"dir"',
        'unicode': 'diråß∂ƒ©∆¬',
        'spanish': 'diráéíóú¿¡üñ',
    }
}


def to_bytes(obj):
    if six.PY3:
        return obj

    if isinstance(obj, six.text_type):
        return obj.encode('utf8')
    elif isinstance(obj, bytearray):
        return bytes(obj)
    else:
        return obj


def ensure_bytes(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        new_args = (to_bytes(arg) for arg in args)
        new_kwargs = {k: to_bytes(v) for k, v in kwargs.items()}
        return func(*new_args, **new_kwargs)
    return wrapper


@ensure_bytes
def touch(name, parent=os.path.curdir):
    path = os.path.join(parent, name)
    try:
        open(path, 'w').close()
    except Exception as e:
        print('Create file [{}] failed, error:\n{}'.format(path, e))


@ensure_bytes
def create_files(parent=os.path.curdir):
    for name in names['file'].values():
        touch(name, parent=parent)


@ensure_bytes
def create_dirs(parent=os.path.curdir):
    for name in names['dir'].values():
        path = os.path.join(parent, name)
        os.mkdir(path)
        touch('example', path)


def create_soft_link():
    cwd = os.getcwd()
    os.chdir(os.path.join(os.path.dirname(__file__), SOURCE))
    os.symlink('dir', 'link_to_dir')
    os.symlink('file', 'link_to_file')

    touch('source')
    os.symlink('source', 'broken_link')
    os.remove('source')

    os.chdir(cwd)


class TestSyncSource(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        if os.path.exists(TEST):
            shutil.rmtree(TEST)

        # create souce and destination dirs for testing
        os.mkdir(TEST)
        os.mkdir(SOURCE)

        # create source files and dirs
        create_files(SOURCE)
        create_dirs(SOURCE)
        create_soft_link()

    @classmethod
    def tearDownClass(cls):
        shutil.rmtree(TEST)

    def setUp(self):
        os.mkdir(PS_DEST)
        os.mkdir(RS_DEST)

    def tearDown(self):
        shutil.rmtree(RS_DEST)
        shutil.rmtree(PS_DEST)

    def base_test(self, source, parallel=None, bwlimit=None, no_progress=True):
        if six.PY2:
            cmd = ['python2', 'parsync.py', source, PS_DEST]
        else:
            cmd = ['python3', 'parsync.py', source, PS_DEST]
        if no_progress:
            cmd.insert(2, '--no-progress')
        if parallel:
            cmd.insert(2, '--parallel')
            cmd.insert(3, str(parallel))
        if bwlimit:
            cmd.insert(2, '--bwlimit')
            cmd.insert(3, str(bwlimit))

        subprocess.check_call(cmd)
        subprocess.check_call(['rsync', '-aqs', source, RS_DEST])

        parsync_dir_tree = os.walk(PS_DEST)
        rsync_dir_tree = os.walk(RS_DEST)

        for ps, rs in zip_longest(parsync_dir_tree, rsync_dir_tree):
            if ps and rs:
                _, dirs1, files1 = ps
                _, dirs2, files2 = rs
                self.assertEqual(sorted(dirs1), sorted(dirs2))
                self.assertEqual(sorted(files1), sorted(files2))
            else:
                self.assertEqual(ps, rs)
                break

    def test_sync_source_dir_with_progress(self):
        self.base_test(SOURCE, no_progress=False)

    def test_sync_files_under_source_dir(self):
        self.base_test('{}/'.format(SOURCE))

    def test_parsync_with_parallel(self):
        self.base_test(SOURCE, parallel=8)

    def test_parsync_with_parallel_1(self):
        self.base_test(SOURCE, parallel=1)

    def test_parsync_with_bwlimit(self):
        self.base_test(SOURCE, bwlimit=10)

    def test_sync_single_file(self):
        self.base_test(os.path.join(SOURCE, names['file']['normal']))

    def test_sync_file_with_Chinese_name(self):
        self.base_test(os.path.join(SOURCE, names['file']['chinese']))

    @unittest.expectedFailure
    def test_source_not_exist(self):
        self.base_test('not_exist')


if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestSyncSource)
    result = unittest.TextTestRunner(verbosity=2).run(suite)
    exit(not result.wasSuccessful())


文章作者: Gavin Wang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Gavin Wang !
  目录