玩命加载中 . . .

Demo for trasferring large file from break point


Overview

Daemon of python script tools to transfer larger of S3 Objects

File list

  • bigtera_download_file.py for downloading file from S3 from breakpoint

  • bigtera_multipart_upload.py for uploading file to S3 from breakpoint

  • obsync.py for RRS to S3 from breakpoint

Usage

Setup environment

  1. Install Python2.7

  2. Install library

Run the following command in command line

pip install boto
pip install filechunkio
  1. Create S3 account and bucket in cluster

Configuration

Change configuration in the head part of bigtera_download_file.py and bigtera_multipart_upload.py

Test downloading file from breakpoint

  1. Run python bigtera_download_file.py in command line

  2. CTRL+C to stop downloading process

  3. Run python bigtera_download_file.py again

  4. Check downloading status

Test uploading file from breakpoint

  1. Run python bigtera_multipart_upload.py in command line

  2. CTRL+C to stop uploading process

  3. Run python bigtera_multipart_upload.py again

  4. Check uploading status

Content of Scripts

bigtera_download_file.py

#!/usr/bin/env python

from boto.s3.connection import S3Connection
from boto.s3.connection import OrdinaryCallingFormat
import os
from StringIO import StringIO
import configparser


HOST = '172.17.59.72'
S3_AK = 'JF06KCDJIAMO8Q3OJQAS'
S3_AS = 'gHnrKj1Vlb6s9IQZRrMDywhTLeNBL2UUMCGeetsf'
S3_BUCKET = 'andy_bucket'
S3_KEY = 'ubuntu-16.04-server-amd64.iso'
LOCAL_FILE = 'E:\\code\\s3_multipart_upload\\localfile.iso'

# Use a chunk size of 10 MiB (feel free to change this, >=5MB)
CHUNK_SIZE = 1024 * 1024 * 10


def create_s3_connection():
    c = S3Connection(calling_format=OrdinaryCallingFormat(),
                     host=HOST, is_secure=False,
                     aws_access_key_id=S3_AK,
                     aws_secret_access_key=S3_AS,
                     validate_certs=False)
    return c


def get_obj_io_ctx(s3_key_obj=None, offset=None, length=None):
    if s3_key_obj is None:
        return None
    io_ctx = StringIO()
    headers = None
    if offset is not None and length is not None:
        headers = {
            "Range": "bytes={}-{}".format(offset, offset+length-1)
        }
    s3_key_obj.get_contents_to_file(io_ctx, headers)
    io_ctx.seek(0)
    return io_ctx


def main():
    c = create_s3_connection()
    b = c.get_bucket(S3_BUCKET)

    key = S3_KEY
    config_file = '.' + os.path.basename(LOCAL_FILE) + '.conf'

    s3_key_obj = b.get_key(key)
    if s3_key_obj is None:
        print(key + " does't exists!")
        exit(1)

    total_file_size = s3_key_obj.size
    remaining = total_file_size

    part_size = CHUNK_SIZE
    part_num = 0

    config = configparser.ConfigParser()
    config.read(config_file)

    stored_total_file_size = config.getint("bigtera_config", "total_file_size", fallback=0)
    stored_chunk_size = config.getint("bigtera_config", "chunk_size", fallback=0)
    stored_downloaded_size = config.getint("bigtera_config", "downloaded_size", fallback=0)

    seek_first = False
    if stored_total_file_size == total_file_size and stored_chunk_size == CHUNK_SIZE:
        remaining = stored_total_file_size - stored_downloaded_size
        part_num = stored_downloaded_size / stored_chunk_size
        open_option = "r+b"
        seek_first = True
    else:
        open_option = "wb"

    with open(LOCAL_FILE, open_option) as f:
        while remaining > 0:
            offset = part_num * part_size
            length = min(remaining, part_size)
            s3_obj_io_ctx = get_obj_io_ctx(s3_key_obj, offset, length)
            if seek_first:
                f.seek(offset)
                seek_first = False
            f.write(s3_obj_io_ctx.read())
            remaining = remaining - length

            downloaded_size = offset + length
            percentage = int(downloaded_size * 100 / total_file_size)
            if percentage > 100:
                percentage = 100
            print(str(downloaded_size) + '/' + str(total_file_size) + '  ' + str(percentage) + '%')

            part_num += 1
            with open(config_file, 'w') as configfile:
                config['bigtera_config'] = {}
                config['bigtera_config']['total_file_size'] = str(total_file_size)
                config['bigtera_config']['chunk_size'] = str(CHUNK_SIZE)
                config['bigtera_config']['downloaded_size'] = str(downloaded_size)

                config.write(configfile)

            if total_file_size == downloaded_size:
                os.remove(config_file)


if __name__ == '__main__':
    main()

bigtera_multipart_upload.py

#!/usr/bin/env python

from boto.s3.connection import S3Connection
from boto.s3.connection import OrdinaryCallingFormat
from boto.s3.multipart import MultiPartUpload
import math
import os
from filechunkio import FileChunkIO
import hashlib
# import time


HOST = '172.17.59.72'
S3_AK = 'JF06KCDJIAMO8Q3OJQAS'
S3_AS = 'gHnrKj1Vlb6s9IQZRrMDywhTLeNBL2UUMCGeetsf'
S3_BUCKET = 'andy_bucket'

LARGE_FILE = 'E:\\code\\s3_multipart_upload\\ubuntu-16.04-server-amd64.iso'

# Use a chunk size of 10 MiB (feel free to change this, >=5MB)
CHUNK_SIZE = 1024 * 1024 * 10


def create_s3_connection():
    c = S3Connection(calling_format=OrdinaryCallingFormat(),
                     host=HOST, is_secure=False,
                     aws_access_key_id=S3_AK,
                     aws_secret_access_key=S3_AS,
                     validate_certs=False)
    return c


def query_pending_parts(b, key):
    response_all_multipart_upload = []

    # print('Current pending parts in S3:')
    multipart_uploads_filter = {'key_marker': key}
    all_multipart_uploads = b.get_all_multipart_uploads(**multipart_uploads_filter)
    for one_multipart_upload in all_multipart_uploads:
        response_multipart_upload = {'key': one_multipart_upload.key_name,
                                     'upload_id': one_multipart_upload.id,
                                     'parts': []
                                     }
        mp = MultiPartUpload(b)
        mp.key_name = one_multipart_upload.key_name
        mp.bucket_name = b.name
        mp.id = one_multipart_upload.id
        all_parts = mp.get_all_parts()
        one_multipart_upload_len = len(all_parts)
        print(' upload_id:' + one_multipart_upload.id + '   key:' + one_multipart_upload.key_name +
              '        parts:' + str(one_multipart_upload_len))
        # print('one_multipart_upload length:' + str(one_multipart_upload_len))
        for one_parts in all_parts:
            response_part = {
                'part_number': one_parts.part_number,
                'etag': one_parts.etag.replace('"', ''),
                'size': one_parts.size
            }
            response_multipart_upload['parts'].append(response_part)
        response_all_multipart_upload.append(response_multipart_upload)

    return response_all_multipart_upload


def upload_left(b, full_file_path, key, pending_multipart_upload, chunk_size):

    source_size = os.stat(full_file_path).st_size

    pending_upload_id = pending_multipart_upload['upload_id']
    # pending_key = pending_multipart_upload['key']

    mp = MultiPartUpload(b)
    mp.key_name = key
    mp.bucket_name = b.name
    mp.id = pending_upload_id

    # Use a chunk size of 10 MiB (feel free to change this)
    # chunk_size = 1024 * 1024 * 10
    # chunk_size = CHUNK_SIZE
    chunk_count = int(math.ceil(source_size / float(chunk_size)))

    # Send the file parts, using FileChunkIO to create a file-like object
    # that points to a certain byte range within the original file. We
    # set bytes to never exceed the original file size.
    for i in range(chunk_count):
        part_num = i + 1
        offset = chunk_size * i
        upload_bytes = min(chunk_size, source_size - offset)

        with FileChunkIO(full_file_path, 'r', offset=offset, bytes=upload_bytes) as fp:
            m = hashlib.md5()
            m.update(fp.readall())
            chunk_md5 = unicode(m.hexdigest())
            found_part = False
            for one_part in pending_multipart_upload['parts']:
                if part_num == one_part['part_number'] and \
                        upload_bytes == one_part['size'] and \
                        chunk_md5 == one_part['etag']:
                    # print(one_part)
                    found_part = True
                    break
            if found_part is True:
                # pass
                continue
            fp.seek(0)
            mp.upload_part_from_file(fp, part_num=part_num)
            print('%d/%d uploaded' % (part_num, chunk_count))

    try:
        # Finish the upload
        return mp.complete_upload()
    except Exception as e:
        mp.cancel_upload()
        raise e


def upload_large_file(b, full_file_path, key, chunk_size):
    # 100MB
    large_file_threshold = 100 * 1024 * 1024

    if chunk_size < 5 * 1024 * 1024:
        chunk_size = 5 * 1024 * 1024

    all_pending_multipart_uploads = query_pending_parts(b, key)

    # Get file info
    source_size = os.stat(full_file_path).st_size

    pending_multipart_upload = None
    for one_pending_multipart_upload in all_pending_multipart_uploads:
        # In real product,we need check file-key-upload_id relation
        if key == one_pending_multipart_upload['key']:
            pending_multipart_upload = one_pending_multipart_upload
            break

    if source_size >= large_file_threshold and pending_multipart_upload is not None:
        print('keep uploading!')
        return upload_left(b, full_file_path, key, pending_multipart_upload, chunk_size)
    else:
        print('No pending upload exits! Upload_from_beginning!')
        return upload_from_beginning(b, full_file_path, key, chunk_size)


def upload_from_beginning(b, full_file_path, key, chunk_size):

    source_size = os.stat(full_file_path).st_size

    # Create a multipart upload request
    mp = b.initiate_multipart_upload(key)
    # keep file-key-upload_id relation for production use
    print('Uploading file:' + full_file_path)
    print('Key in S3:' + key)
    print('upload_id:' + mp.id)
    print('--------------------')

    # Use a chunk size of 10 MiB (feel free to change this)
    # chunk_size = 1024 * 1024 * 10
    # chunk_size = CHUNK_SIZE
    chunk_count = int(math.ceil(source_size / float(chunk_size)))

    # Send the file parts, using FileChunkIO to create a file-like object
    # that points to a certain byte range within the original file. We
    # set bytes to never exceed the original file size.
    for i in range(chunk_count):
        part_num = i + 1
        offset = chunk_size * i
        upload_bytes = min(chunk_size, source_size - offset)
        with FileChunkIO(full_file_path, 'rb', offset=offset, bytes=upload_bytes) as fp:
            mp.upload_part_from_file(fp, part_num=part_num)

        print('%d/%d uploaded' % (part_num, chunk_count))
        # print('sleeping 1 second ...')
        # time.sleep(1)
    return mp.complete_upload()


def main():
    c = create_s3_connection()
    b = c.get_bucket(S3_BUCKET)

    full_file_path = LARGE_FILE
    key = os.path.basename(full_file_path)

    upload_large_file(b, full_file_path, key, CHUNK_SIZE)


if __name__ == '__main__':
    main()

obsync.py

"""
obsync.py: common library for ezobsync and s3backup
"""
from boto.exception import S3ResponseError
from boto.s3.connection import OrdinaryCallingFormat, SubdomainCallingFormat
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.s3.multipart import MultiPartUpload
from lxml import etree
from ezs3.log import EZLog
import boto
import errno
import hashlib
import os
from StringIO import StringIO
import tempfile
import time
import traceback
import xattr
import shutil

try:
    import cloudfiles
except ImportError:
    cloudfiles = None

logger = EZLog.get_logger(__name__)

ACL_XATTR = "rados.acl"
META_XATTR_PREFIX = "rados.meta."
BREAK_POINT_META = "break_point"
CONTENT_TYPE_XATTR = "rados.content_type"
# for FileStore xattr
SRC_MULTIPART_ETAG_XATTR = "rados.meta.srcetag"
MD5_XATTR = "rados.md5"
MTIME_XATTR = "rados.mtime"

# It is also in-memory chunk size; don't set it too large!
MULTIPART_THRESH = 10485760
# For S3 backup from breakpoint,100MB
BREAKPOINT_THRESHOLD = 100 * 1024 * 1024


class ObsyncException(Exception):
    def __init__(self, ty, e):
        if (isinstance(e, str)):
            # from a string
            self.tb = "".join(traceback.format_stack())
            self.comment = e
        else:
            # from another exception
            self.tb = traceback.format_exc(100000)
            self.comment = None
        self.ty = ty


class ObsyncTemporaryException(ObsyncException):
    """
    A temporary obsync exception.
    The user may want to retry the operation that failed.
    We can create one of these from a string or from another exception.
    """
    def __init__(self, e):
        ObsyncException.__init__(self, "temporary", e)


class ObsyncPermanentException(ObsyncException):
    """
    A permanent obsync exception.
    We can create one of these from a string or from another exception.
    """
    def __init__(self, e):
        ObsyncException.__init__(self, "permanent", e)


def test_xattr_support(path):
    test_file = path + "/$TEST"
    f = open(test_file, 'w')
    f.close
    try:
        xattr.set(test_file, "test", "123", namespace=xattr.NS_USER)
        if xattr.get(test_file, "test", namespace=xattr.NS_USER) != "123":
            raise ObsyncPermanentException(
                "test_xattr_support: failed to set an xattr and "
                "read it back.")
    except IOError:
        exc = "You do not appear to have xattr support at {}".format(path)
        logger.error(exc)
        raise ObsyncPermanentException(exc)
    finally:
        os.unlink(test_file)


def xattr_is_metadata(k):
    # miscellaneous user-defined metadata
    if (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
        return True
    # content-type
    elif (k == CONTENT_TYPE_XATTR):
        return True
    return False


def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError, exc:
        if exc.errno != errno.EEXIST:
            raise ObsyncTemporaryException(exc)
        if not os.path.isdir(path):
            raise ObsyncTemporaryException(exc)


def bytes_to_str(b):
    return ''.join(["%02x" % ord(x) for x in b]).strip()


def get_md5(f, block_size=2**20):
    mtime = os.stat(f.name).st_mtime
    try:
        stored_md5 = xattr.get(f.name, MD5_XATTR, namespace=xattr.NS_USER)
        stored_mtime = xattr.get(f.name, MTIME_XATTR, namespace=xattr.NS_USER)
        if str(mtime) == stored_mtime:
            return stored_md5
    except IOError as e:
        if e.errno != errno.ENODATA:
            raise

    md5 = hashlib.md5()
    while True:
        data = f.read(block_size)
        if not data:
            break
        md5.update(data)
    md5sum = md5.hexdigest()
    file_set_md5(f.name, md5sum, mtime)

    return md5sum


def file_set_md5(path, md5, mtime=None):
    if not mtime:
        mtime = str(os.stat(path).st_mtime)
    xattr.set(path, MD5_XATTR, md5, namespace=xattr.NS_USER)
    xattr.set(path, MTIME_XATTR, str(mtime), namespace=xattr.NS_USER)


def strip_prefix(prefix, s):
    if not (s[0:len(prefix)] == prefix):
        return None
    return s[len(prefix):]


def etag_to_md5(etag):
    if (etag[:1] == '"'):
        start = 1
    else:
        start = 0
    if (etag[-1:] == '"'):
        end = -1
    else:
        end = None
    return etag[start:end]


# Escaping functions.
#
# Valid names for local files are a little different than valid object
# names for S3. So these functions are needed to translate.
#
# Basically, in local names, every sequence starting with a dollar sign is
# reserved as a special escape sequence. If you want to create an S3 object
# with a dollar sign in the name, the local file should have a double dollar
# sign ($$).
#
# TODO: translate local files' control characters into escape sequences.
# Most S3 clients (boto included) cannot handle control characters in S3 object
# names.
# TODO: check for invalid utf-8 in local file names. Ideally, escape it, but
# if not, just reject the local file name. S3 object names must be valid
# utf-8.
#
# ----------        -----------
# In S3             Locally
# ----------        -----------
# foo/              foo$slash
#
# $money            $$money
#
# obj-with-acl      obj-with-acl
#                  .obj-with-acl$acl
def s3_name_to_local_name(s3_name):
    # Now we treat an object which names end with slash as a local folder
    return s3_name

def local_name_to_s3_name(local_name):
    # Now we treat a local folder as a prefix object
    return local_name

###### ACLs #######
READ = 1
WRITE = 2
READ_ACP = 4
WRITE_ACP = 8
FULL_CONTROL = READ | WRITE | READ_ACP | WRITE_ACP

PERM_MAP = {
    "FULL_CONTROL": FULL_CONTROL,
    "READ": READ,
    "WRITE": WRITE,
    "READ_ACP": READ_ACP,
    "WRITE_ACP": WRITE_ACP
}


def perm_to_str_list(perm):
    perms = []
    if perm == FULL_CONTROL:
        perms.append("FULL_CONTROL")
    elif (perm & READ):
        perms.append("READ")
    elif (perm & WRITE):
        perms.append("WRITE")
    elif (perm & READ_ACP):
        perms.append("READ_ACP")
    elif (perm & WRITE_ACP):
        perms.append("WRITE_ACP")
    return perms


ACL_TYPE_CANON_USER = "canon:"
ACL_TYPE_EMAIL_USER = "email:"
ACL_TYPE_GROUP = "group:"
ALL_ACL_TYPES = [ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP]

S3_GROUP_AUTH_USERS = ACL_TYPE_GROUP + "AuthenticatedUsers"
S3_GROUP_ALL_USERS = ACL_TYPE_GROUP + "AllUsers"
S3_GROUP_LOG_DELIVERY = ACL_TYPE_GROUP + "LogDelivery"

NS = "http://s3.amazonaws.com/doc/2006-03-01/"
NS2 = "http://www.w3.org/2001/XMLSchema-instance"


def get_user_type(utype):
    for ut in [ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP]:
        if utype[:len(ut)] == ut:
            return ut
    raise ObsyncPermanentException("unknown user type for user %s" % utype)


def strip_user_type(utype):
    for ut in [ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP]:
        if utype[:len(ut)] == ut:
            return utype[len(ut):]
    raise ObsyncPermanentException("unknown user type for user %s" % utype)


def grantee_attribute_to_user_type(utype):
    if (utype == "Canonical User"):
        return ACL_TYPE_CANON_USER
    elif (utype == "CanonicalUser"):
        return ACL_TYPE_CANON_USER
    elif (utype == "Group"):
        return ACL_TYPE_GROUP
    elif (utype == "Email User"):
        return ACL_TYPE_EMAIL_USER
    elif (utype == "EmailUser"):
        return ACL_TYPE_EMAIL_USER
    else:
        raise ObsyncPermanentException("unknown user type for user %s" % utype)


def user_type_to_attr(t):
    if (t == ACL_TYPE_CANON_USER):
        return "CanonicalUser"
    elif (t == ACL_TYPE_GROUP):
        return "Group"
    elif (t == ACL_TYPE_EMAIL_USER):
        return "EmailUser"
    else:
        raise ObsyncPermanentException("unknown user type %s" % t)


def add_user_type(user):
    """
    All users that are not specifically marked as something else
    are treated as canonical users
    """
    for atype in ALL_ACL_TYPES:
        if (user[:len(atype)] == atype):
            return user
    return ACL_TYPE_CANON_USER + user


class AclGrant(object):
    def __init__(self, user_id, display_name, permission):
        self.user_id = user_id
        self.display_name = display_name
        self.permission = permission

    def translate_users(self, xusers):
        # Keep in mind that xusers contains user_ids of the form "type:value"
        # So typical contents might be like { canon:XYZ => canon.123 }
        if self.user_id in xusers:
            self.user_id = xusers[self.user_id]
            # It's not clear what the new pretty-name should be, so just leave it blank.
            self.display_name = None

    def equals(self, rhs):
        if self.user_id != rhs.user_id:
            return False
        if self.permission != rhs.permission:
            return False
        # ignore display_name
        return True


class AclPolicy(object):
    def __init__(self, owner_id, owner_display_name, grants):
        self.owner_id = owner_id
        self.owner_display_name = owner_display_name
        self.grants = grants  # dict of { string -> ACLGrant }

    @staticmethod
    def create_default(owner_id):
        grants = {}
        grants[ACL_TYPE_CANON_USER + owner_id] = \
            AclGrant(ACL_TYPE_CANON_USER + owner_id, None, FULL_CONTROL)
        return AclPolicy(owner_id, None, grants)

    @staticmethod
    def from_xml(s):
        root = etree.parse(StringIO(s))

        owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS, NS))
        owner_id = owner_id_node.text
        owner_display_name = root.find("{%s}Owner/{%s}DisplayName" % (NS, NS))
        if owner_display_name is not None:
            owner_display_name = owner_display_name.text

        grantlist = root.findall("{%s}AccessControlList/{%s}Grant" % (NS, NS))
        grants = {}

        for g in grantlist:
            grantee = g.find("{%s}Grantee" % NS)
            if "type" in grantee.attrib:
                grantee_type = grantee.attrib["type"]
            else:
                grantee_type = grantee.attrib["{%s}type" % NS2]

            user_type = grantee_attribute_to_user_type(grantee_type)

            if user_type == ACL_TYPE_CANON_USER:
                user_id = grantee.find("{%s}ID" % NS).text
                display_name = grantee.find("{%s}DisplayName" % NS)
                if display_name is not None:
                    display_name = display_name.text
            elif user_type == ACL_TYPE_EMAIL_USER:
                user_id = grantee.find("{%s}EmailAddress" % NS).text
                display_name = None
            elif user_type == ACL_TYPE_GROUP:
                user_id = grantee.find("{%s}URI" % NS).text
                display_name = None
            else:
                raise ObsyncPermanentException("unknown ACL grantee type")

            permission = g.find("{%s}Permission" % NS).text
            perm = PERM_MAP[permission]
            grant_user_id = user_type + user_id
            if grant_user_id not in grants:
                grants[grant_user_id] = AclGrant(grant_user_id, display_name, perm)
            else:
                grants[grant_user_id].permission |= perm

        return AclPolicy(owner_id, owner_display_name, grants)

    def to_xml(self):
        root = etree.Element("AccessControlPolicy", nsmap={None: NS})
        owner = etree.SubElement(root, "Owner")
        id_elem = etree.SubElement(owner, "ID")
        id_elem.text = self.owner_id
        if self.owner_display_name:
            display_name_elem = etree.SubElement(owner, "DisplayName")
            display_name_elem.text = self.owner_display_name

        access_control_list = etree.SubElement(root, "AccessControlList")
        for k, g in self.grants.items():
            perms = perm_to_str_list(g.permission)
            for perm in perms:
                grant_elem = etree.SubElement(access_control_list, "Grant")
                grantee_elem = etree.SubElement(
                    grant_elem, "{%s}Grantee" % NS, nsmap={None: NS, "xsi": NS2})

                user_type = get_user_type(g.user_id)
                grantee_elem.set("{%s}type" % NS2, user_type_to_attr(user_type))

                if user_type == ACL_TYPE_CANON_USER:
                    user_id_elem = etree.SubElement(grantee_elem, "{%s}ID" % NS)
                    user_id_elem.text = strip_user_type(g.user_id)
                    if g.display_name:
                        display_name_elem = etree.SubElement(
                            grantee_elem, "{%s}DisplayName" % NS)
                        display_name_elem.text = g.display_name
                elif user_type == ACL_TYPE_EMAIL_USER:
                    email_elem = etree.SubElement(
                        grantee_elem, "{%s}EmailAddress" % NS)
                    email_elem.text = strip_user_type(g.user_id)
                elif user_type == ACL_TYPE_GROUP:
                    group_elem = etree.SubElement(grantee_elem, "{%s}URI" % NS)
                    group_elem.text = strip_user_type(g.user_id)
                else:
                    raise ObsyncPermanentException("unknown ACL grantee type")

                permission_elem = etree.SubElement(grant_elem, "{%s}Permission" % NS)
                permission_elem.text = perm
        return etree.tostring(root, encoding="UTF-8")

    def translate_users(self, xusers):
        # Owner ids are always expressed in terms of canonical user id
        user = ACL_TYPE_CANON_USER + self.owner_id
        if (user in xusers):
            self.owner_id = strip_user_type(xusers[user])
            self.owner_display_name = ""

        for k, g in self.grants.items():
            g.translate_users(xusers)

    def get_all_users(self):
        """ Get a list of all user ids referenced in this ACL """
        users = {}
        users[ACL_TYPE_CANON_USER + self.owner_id] = 1
        for k, g in self.grants.items():
            users[k] = 1
        return users.keys()

    def set_owner(self, owner_id):
        self.owner_id = owner_id
        self.owner_display_name = ""

    def equals(self, rhs):
        if (self.owner_id != rhs.owner_id):
            return False
        for k, g in self.grants.items():
            if k not in rhs.grants:
                return False
            if not g.equals(rhs.grants[k]):
                return False
        for l, r in rhs.grants.items():
            if l not in self.grants:
                return False
            if not r.equals(self.grants[l]):
                return False
        return True


class Object(object):

    def __init__(self, name, md5, size, meta):
        if isinstance(name, unicode):
            self.name = name
        else:
            self.name = name.decode('UTF-8')
        self.md5 = md5
        self.size = int(size)
        if SRC_MULTIPART_ETAG_XATTR in meta:
            self.md5 = meta[SRC_MULTIPART_ETAG_XATTR]
            del meta[SRC_MULTIPART_ETAG_XATTR]
        self.meta = meta
        self.is_folder = False
        if name.endswith("/"):
            self.is_folder = True

    def __str__(self):
        return u"(name: {}, size: {}: md5: {}, meta: {}, is_folder: {})".format(self.name, self.size, self.md5, self.meta, self.is_folder)

    def should_check_etags(self, rhs, check_etags):
        if check_etags == "never":
            return False
        if check_etags == "relaxed":
            logger.debug(
                "ETAGS: --check-etags={0}, self.md5 = {1}, rhs.md5 = {2}"
                .format(check_etags, self.md5, rhs.md5)
            )
            return not ('-' in self.md5 or '-' in rhs.md5)
        return True

    def equals(self, rhs, ignore_empty_s3_meta=False, cmp_etag="always"):
        if self.is_folder and rhs.is_folder:
            if (self.name != rhs.name):
                logger.debug("EQUALS: self.name = %s, rhs.name = %s" % (self.name, rhs.name))
                return False
            return True
        else:
            if (self.name != rhs.name):
                logger.debug("EQUALS: self.name = %s, rhs.name = %s" % (self.name, rhs.name))
                return False
            if (self.should_check_etags(rhs, cmp_etag) and self.md5 != rhs.md5):
                logger.debug("EQUALS: self.md5 = %s, rhs.md5 = %s" % (self.md5, rhs.md5))
                return False
            if (self.size != rhs.size):
                logger.debug("EQUALS: self.size = %d, rhs.size = %d" % (self.size, rhs.size))
                return False
            for k, v in self.meta.items():
                if k not in rhs.meta:
                    logger.debug("EQUALS: rhs.meta lacks key %s" % k)
                    return False
                if (rhs.meta[k] != v):
                    logger.debug(
                        "EQUALS: self.meta[%s] = %s, rhs.meta[%s] = %s" % (k, v, k, rhs.meta[k])
                    )
                    return False
            if self.meta or not ignore_empty_s3_meta:
                for k, v in rhs.meta.items():
                    if k not in self.meta:
                        logger.debug("EQUALS: self.meta lacks key %s" % k)
                        return False
            logger.debug("EQUALS: the objects are equal.")
            return True

    def local_name(self):
        local_name = s3_name_to_local_name(self.name)
        if not isinstance(local_name, unicode):
            local_name = local_name.decode('UTF-8')
        return local_name

    def local_path(self, base):
        if not isinstance(base, unicode):
            base = base.decode('UTF-8')
        if self.is_folder:
            full_path = os.path.join(base, self.name)
        else:
            full_path = os.path.join(base, self.local_name())
        return full_path.encode('UTF-8')

    @staticmethod
    def from_file(obj_name, path):
        f = open(path, 'r')
        try:
            md5 = get_md5(f)
        finally:
            f.close()
        size = os.path.getsize(path)
        meta = {}
        try:
            xlist = xattr.get_all(path, namespace=xattr.NS_USER)
        except IOError, e:
            if e.errno == 2:
                return meta
            else:
                raise ObsyncTemporaryException(e)
        for k, v in xlist:
            if xattr_is_metadata(k):
                meta[k] = v
        #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size)
        return Object(obj_name, md5, size, meta)

    @staticmethod
    def from_folder(obj_name, path):
        return Object(obj_name, 'd41d8cd98f00b204e9800998ecf8427e', 0, {})


class Store(object):
    @staticmethod
    def make_store(type, is_dst, create, akey, skey, dry_run, **kwargs):
        if (type == 's3'):
            try:
                store = S3Store(kwargs['host'], kwargs['bucket'],
                                kwargs['prefix'], create, akey, skey, dry_run,
                                is_secure=True)
                logger.info("Connect to S3 store via HTTPS successfully.")
            except Exception as e:
                store = S3Store(kwargs['host'], kwargs['bucket'],
                                kwargs['prefix'], create, akey, skey, dry_run,
                                is_secure=False)
                logger.info("Connect to S3 store via HTTP successfully.")
            return store
        elif (type == 'swift'):
            return SwiftStore(kwargs['host'], kwargs['bucket'],
                              kwargs['prefix'], create, akey, skey, dry_run,
                              False)
        elif (type == 'file'):
            return FileStore(kwargs['path'], create, dry_run,
                             kwargs.get('follow_symlinks', False))
        raise ObsyncPermanentException('Unknown store type: "%s"' % type)


class LocalCopy(object):

    def __init__(self, obj_name, path, path_is_temp):
        self.obj_name = obj_name
        self.path = path
        self.path_is_temp = path_is_temp

    def remove(self):
        if self.path_is_temp and (self.path is not None):
            os.unlink(self.path)
        self.path = None
        self.path_is_temp = False

    def __del__(self):
        self.remove()


class LocalAcl(object):

    @staticmethod
    def from_xml(obj_name, xml):
        acl_policy = AclPolicy.from_xml(xml)
        return LocalAcl(obj_name, acl_policy)

    @staticmethod
    def get_empty(obj_name):
        return LocalAcl(obj_name, None)

    def __init__(self, obj_name, acl_policy):
        self.obj_name = obj_name
        self.acl_policy = acl_policy

    def equals(self, rhs, ignore_empty_s3_acl=False):
        """ Compare two LocalAcls """
        if self.acl_policy is None:
            return (ignore_empty_s3_acl or rhs.acl_policy is None)
        if rhs.acl_policy is None:
            return (self.acl_policy is None)
        return self.acl_policy.equals(rhs.acl_policy)

    def translate_users(self, xusers):
        """ Translate the users in this ACL """
        if (self.acl_policy is None):
            return
        self.acl_policy.translate_users(xusers)

    def set_owner(self, owner_id):
        if (self.acl_policy is None):
            return
        self.acl_policy.set_owner(owner_id)

    def write_to_xattr(self, file_name):
        """ Write this ACL to an extended attribute """
        if (self.acl_policy is None):
            return
        xml = self.acl_policy.to_xml()
        xattr.set(file_name, ACL_XATTR, xml, namespace=xattr.NS_USER)


###### S3 store #######
def s3_key_to_meta(k):
    meta = {}
    if (k.__dict__.has_key("content_type")):
        meta[CONTENT_TYPE_XATTR] = k.content_type
    for k, v in k.metadata.items():
        meta[META_XATTR_PREFIX + k] = v
    return meta


def meta_to_s3_key(key, meta):
    for k, v in meta.items():
        if (k == CONTENT_TYPE_XATTR):
            key.set_metadata("Content-Type", v)
        elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
            k_name = k[len(META_XATTR_PREFIX):]
            key.set_metadata(k_name, v)
        else:
            raise ObsyncPermanentException("can't understand meta entry: %s" % k)


def meta_to_dict(meta):
    result = {}
    for k, v in meta.items():
        if (k == CONTENT_TYPE_XATTR):
            result["Content-Type"] = v
        elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
            k_name = k[len(META_XATTR_PREFIX):]
            result[k_name] = v
        else:
            raise ObsyncPermanentException("can't understand meta entry: %s" % k)
    return result


class S3StoreIterator(object):
    """S3Store iterator"""
    def __init__(self, bucket, blrs):
        self.bucket = bucket
        self.blrs = blrs

    def __iter__(self):
        return self

    def next(self):
        # This will raise StopIteration when there are no more objects to
        # iterate on
        key = self.blrs.next()
        # Issue a HEAD request to get content-type and other metadata
        k = self.bucket.get_key(key.name)
        ret = Object(key.name, etag_to_md5(key.etag), key.size, s3_key_to_meta(k))
        return ret


class S3Store(Store):
    def __init__(self, host, bucket_name, object_prefix, create, akey, skey,
                 dry_run, is_secure):
        self.host = host
        self.bucket_name = bucket_name
        self.key_prefix = object_prefix
        self.dry_run = dry_run

        self.conn = S3Connection(calling_format=OrdinaryCallingFormat(),
                                 host=self.host, is_secure=is_secure,
                                 aws_access_key_id=akey, aws_secret_access_key=skey)

        try:
            self.bucket = self.conn.get_bucket(self.bucket_name)
        except S3ResponseError as e:
            if e.status == 301: # Moved Permanently, try subdomain format
                self.conn = S3Connection(
                    calling_format=SubdomainCallingFormat(),
                    host=self.host, is_secure=is_secure,
                    aws_access_key_id=akey,
                    aws_secret_access_key=skey
                )
                try:
                    self.bucket = self.conn.get_bucket(self.bucket_name)
                except S3ResponseError as e:
                    if e.status == 404:
                        self.bucket = None
                    else:
                        raise e
            elif e.status == 404:
                self.bucket = None
            else:
                raise e

        if self.bucket is None: # Not Found
            if create:
                if not self.dry_run:
                    self.bucket = self.conn.create_bucket(self.bucket_name)
            else:
                raise ObsyncPermanentException(
                    "The bucket does not exist and we are not to create it."
                )

    def __str__(self):
        return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix

    def get_acl(self, obj):
        acl_xml = self.bucket.get_xml_acl(obj.name)
        return LocalAcl.from_xml(obj.name, acl_xml)

    def make_local_copy(self, obj):
        k = Key(self.bucket)
        k.key = obj.name
        temp_file = tempfile.NamedTemporaryFile(prefix='/run/shm/', mode='w+b', delete=False).name
        try:
            k.get_contents_to_filename(temp_file)
        except Exception, e:
            os.unlink(temp_file)
            raise ObsyncTemporaryException(e)
        return LocalCopy(obj.name, temp_file, True)

    def all_objects(self):
        if self.bucket is None:
            return []
        blrs = self.bucket.list(prefix=self.key_prefix)
        return S3StoreIterator(self.bucket, blrs.__iter__())

    def get_object_size(self, obj):
        k = self.bucket.get_key(obj.name)
        if (k is None):
            return 0
        return k.size

    def locate_object(self, obj):
        if self.bucket is None:
            return None
        k = self.bucket.get_key(obj.name)
        if (k is None):
            logger.debug(u"S3Store: cannot locate obj: {}".format(obj.name))
            return None
        else:
            logger.debug(u"S3Store: located obj: {}".format(obj.name))
        return Object(obj.name, etag_to_md5(k.etag), k.size, s3_key_to_meta(k))

    def multipart_upload(self, store, acl, obj):
        pass

    def query_existing_multipart_uploads_in_s3(self, obj):
        multipart_uploads_filter = {'key_marker': obj.name}
        all_multipart_uploads = self.bucket.get_all_multipart_uploads(**multipart_uploads_filter)
        if len(all_multipart_uploads) >= 1:
            logger.debug(u"S3Store: multipart_uploads exists '{}'".format(obj.name))
            existing_parts = []
            mpu = MultiPartUpload(self.bucket)
            mpu.key_name = obj.name
            mpu.bucket_name = self.bucket_name
            mpu.id = all_multipart_uploads[0].id

            all_parts = mpu.get_all_parts()
            for one_parts in all_parts:
                response_part = {
                    'part_number': one_parts.part_number,
                    'etag': one_parts.etag.replace('"', ''),
                    'size': one_parts.size
                }
                existing_parts.append(response_part)
            return {'mpu': mpu, 'parts': existing_parts}

        else:
            return None

    def get_obj_ioctx_md5(self, ioctx):
        if self.dry_run:
            return ''
        m = hashlib.md5()
        m.update(ioctx.read())
        md5 = m.hexdigest()
        ioctx.seek(0)

        return md5

    def upload(self, src, obj):
        if self.dry_run:
            return

        logger.debug(u"S3Store: upload obj '{}'".format(obj.name))

        if obj.size <= MULTIPART_THRESH:
            k = Key(self.bucket)
            k.key = obj.name
            meta_to_s3_key(k, obj.meta)
            ioctx = src.get_obj_ioctx(obj)
            k.set_contents_from_file(ioctx)
        else:
            if obj.size >= BREAKPOINT_THRESHOLD:
                existing_multipart_upload = self.query_existing_multipart_uploads_in_s3(obj)
            else:
                # For small files,just upload them again
                existing_multipart_upload = None

            if existing_multipart_upload is not None:
                logger.info(u"S3Store: uploading file from breakpoint: '{}'".format(obj.name))
                # There some parts in S3,keep uploading ...
                mpu = existing_multipart_upload['mpu']
                try:
                    remaining = obj.size
                    part_num = 0
                    part_size = MULTIPART_THRESH

                    while remaining > 0:
                        current_part_num = part_num + 1
                        offset = part_num * part_size
                        length = min(remaining, part_size)
                        ioctx = src.get_obj_ioctx(obj, offset, length)

                        local_etag = unicode(self.get_obj_ioctx_md5(ioctx))
                        found_part = False
                        for one_part in existing_multipart_upload['parts']:
                            # we'd better check etag.
                            if current_part_num == one_part['part_number'] and \
                                    length == one_part['size'] and \
                                    local_etag == one_part['etag']:
                                found_part = True
                                break
                        if found_part is False:
                            logger.debug(u"S3Store: uploading part: '{}'".format(current_part_num))
                            mpu.upload_part_from_file(ioctx, current_part_num)
                        else:
                            logger.debug(u"S3Store: found part: '{}'".format(current_part_num))

                        remaining -= length
                        part_num += 1

                    logger.info(u"S3Store: Ready to complete_upload: '{}'".format(obj.name))
                    mpu.complete_upload()
                except Exception as e:
                    # Something is wrong.Maybe there are garbage data.Remove them.
                    logger.warning(u"S3Store: Something is wrong.cancel_upload: '{}'".format(obj.name))
                    mpu.cancel_upload()
                    raise e

            else:
                mpu = self.bucket.initiate_multipart_upload(obj.name, metadata=meta_to_dict(obj.meta))
                try:
                    remaining = obj.size
                    part_num = 0
                    part_size = MULTIPART_THRESH

                    while remaining > 0:
                        offset = part_num * part_size
                        length = min(remaining, part_size)
                        ioctx = src.get_obj_ioctx(obj, offset, length)
                        mpu.upload_part_from_file(ioctx, part_num + 1)
                        remaining -= length
                        part_num += 1

                    mpu.complete_upload()
                except Exception as e:
                    # Do NOT cancel for large file.We will use previous parts in future.
                    if obj.size < BREAKPOINT_THRESHOLD:
                        mpu.cancel_upload()
                    raise e

    def set_acl(self, src_acl, obj):
        if src_acl.acl_policy is not None:
            xml = src_acl.acl_policy.to_xml()
            try:
                def fn():
                    self.bucket.set_xml_acl(xml, obj.name)
                do_with_s3_retries(fn)
            except boto.exception.S3ResponseError, e:
                logger.error(
                    "ERROR SETTING ACL on object '{}'\n"
                    "************* ACL: *************\n"
                    "{}\n"
                    "********************************\n"
                    .format(obj.name, str(xml))
                )
                raise ObsyncTemporaryException(e)

    def remove(self, obj):
        if self.dry_run:
            return
        self.bucket.delete_key(obj.name)
        logger.debug("S3Store: removed %s", obj.name)

    def clear(self):
        pass

    def get_obj_ioctx(self, obj, offset=None, length=None):
        k = self.bucket.get_key(obj.name)
        if k is None:
            return None
        ioctx = StringIO()
        headers = None
        if offset is not None and length is not None:
            headers = {
                "Range": "bytes={}-{}".format(offset, offset+length-1)
            }
        k.get_contents_to_file(ioctx, headers)
        ioctx.seek(0)
        return ioctx


# Some S3 servers offer "eventual consistency."
# What this means is that after a change has been made, like the creation of an
# object, it takes some time for this change to become visible to everyone.
# This potentially includes the client making the change.
#
# This means we need to implement a retry mechanism for certain operations.
# For example, setting the ACL on a newly created object may fail with an
# "object not found" error if the object creation hasn't yet become visible to
# us.
def do_with_s3_retries(fn):
    if (os.environ.has_key("DST_CONSISTENCY") and
            os.environ["DST_CONSISTENCY"] == "eventual"):
        sleep_times = [5, 10, 60, -1]
    else:
        sleep_times = [-1]
    for stime in sleep_times:
        try:
            fn()
            return
        except boto.exception.S3ResponseError, e:
            if (stime == -1):
                raise ObsyncTemporaryException(e)
            logger.warning(
                "encountered s3 response error: %s: " \
                    "retrying operation after %d second delay",
                str(e), str(stime)
            )
            time.sleep(stime)


###### Swift store #######
def swift_object_to_meta(obj):
    meta = {}
    if (obj.__dict__.has_key("content_type")):
        meta[CONTENT_TYPE_XATTR] = obj.content_type
    for k, v in obj.metadata.items():
        meta[META_XATTR_PREFIX + k] = v
    return meta


def meta_to_swift_object(obj, meta):
    for k, v in meta.items():
        if (k == CONTENT_TYPE_XATTR):
            obj.metadata["Content-Type"] = v
        elif (k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX):
            k_name = k[len(META_XATTR_PREFIX):]
            obj.metadata[k_name] = v
        else:
            raise ObsyncPermanentException("can't understand meta entry: %s" % k)
        obj.sync_metadata()


class SwiftStoreIterator(object):
    """SwiftStore iterator"""
    def __init__(self, container, object_list):
        self.container = container
        self.generator = (o for o in object_list)

    def __iter__(self):
        return self

    def next(self):
        # This will raise StopIteration when there are no more objects to
        # iterate on
        obj = self.generator.next()
        obj = self.container.get_object(obj.name)
        ret = Object(obj.name, etag_to_md5(obj._etag), obj.size, swift_object_to_meta(obj))
        return ret


class SwiftStore(Store):
    def __init__(self, authurl, container_name, object_prefix, create, akey,
                 skey, dry_run, is_secure):
        self.host = authurl
        self.container_name = container_name
        self.object_prefix = object_prefix
        self.dry_run = dry_run

        self.conn = cloudfiles.get_connection(
            username=akey, api_key=skey, authurl=self.host
        )
        try:
            self.container = self.conn.get_container(self.container_name)
        except cloudfiles.errors.NoSuchContainer:
            self.container = None

        if self.container is None:
            if create:
                if not self.dry_run:
                    self.container = self.conn.create_container(
                        self.container_name
                    )
            else:
                raise ObsyncPermanentException(
                    "%s: no such container as %s" % (self.host, self.container_name)
                )

    def __str__(self):
        return "swift://{}/{}/{}".format(
            self.host, self.container_name, self.object_prefix
        )

    def get_acl(self, obj):
        raise NotImplementedError   # We always use --no-preserve-acls right now

    def make_local_copy(self, obj):
        o = self.container.get_object(obj.name)
        temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
        try:
            o.save_to_filename(temp_file)
        except Exception, e:
            os.unlink(temp_file)
            raise ObsyncTemporaryException(e)
        return LocalCopy(obj.name, temp_file, True)

    def all_objects(self):
        if self.container is None:
            return []
        object_list = self.container.get_objects(prefix=self.object_prefix)
        return SwiftStoreIterator(self.container, object_list)

    def locate_object(self, obj):
        if self.container is None:
            return None
        try:
            o = self.container.get_object(obj.name)
        except cloudfiles.errors.NoSuchObject:
            return None
        return Object(obj.name, etag_to_md5(o._etag), o.size, swift_object_to_meta(o))

    def upload(self, src, obj):
        if self.dry_run:
            return

        logger.debug(u"SwiftStore: upload obj '{}'".format(obj.name))

        o = self.container.create_object(obj.name)

        if obj.size <= MULTIPART_THRESH:
            ioctx = src.get_obj_ioctx(obj)
            o.write(ioctx)
            meta_to_swift_object(o, obj.meta)
        else:
            raise NotImplementedError("obj size too large. not supported")

    def set_acl(self, src_acl, obj):
        if src_acl.acl_policy is not None:
            #FIXME: no acls for swift yet
            pass

    def remove(self, obj):
        if self.dry_run:
            return
        self.container.delete_object(obj.name)
        logger.debug("SwiftStore: removed %s", obj.name)

    def clear(self):
        pass

    def get_obj_ioctx(self, obj, offset=None, length=None):
        try:
            o = self.container.get_object(obj.name)
            ioctx = StringIO()
            if offset is None or length is None:
                o.read(buffer=ioctx)
            else:
                o.read(offset=offset, size=length, buffer=ioctx)
            ioctx.seek(0)
            return ioctx
        except cloudfiles.errors.NoSuchObject:
            return None


###### FileStore #######
class FileStoreIterator(object):
    """FileStore iterator"""
    def __init__(self, base, follow_symlinks=False):
        self.base = base
        self.generator = os.walk(base, followlinks=follow_symlinks)
        self.path = ""
        self.files = []

    def __iter__(self):
        return self

    def next(self):
        while True:
            if (len(self.files) == 0):
                self.path, dirs, self.files = self.generator.next()
                self.files.extend(dirs)
                continue
            path = os.path.join(self.path, self.files[0])
            self.files = self.files[1:]
            # Ignore non-files when iterating.
            if os.path.isfile(path):
                obj_name = local_name_to_s3_name(path[len(self.base)+1:])
                return Object.from_file(obj_name, path)
            elif os.path.isdir(path):
                return Object.from_folder(path[len(self.base)+1:] + "/", path)


class FileStore(Store):
    def __init__(self, url, create, dry_run, follow_symlinks=False):
        # Parse the file url
        self.base = url
        self.dry_run = dry_run
        self.follow_symlinks = follow_symlinks
        if (self.base[-1:] == '/'):
            self.base = self.base[:-1]
        if create:
            if not self.dry_run:
                mkdir_p(self.base)
        elif not os.path.isdir(self.base):
            raise ObsyncPermanentException("NonexistentStore")

        if os.path.isdir(self.base):
            test_xattr_support(self.base)

    def __str__(self):
        return "file://" + self.base

    def get_acl(self, obj):
        try:
            xml = xattr.get(obj.local_path(self.base), ACL_XATTR,
                            namespace=xattr.NS_USER)
        except IOError, e:
            #print "failed to get XML ACL from %s" % obj.local_name()
            if e.errno == 61:
                return LocalAcl.get_empty(obj.name)
            raise ObsyncPermanentException(e)
        return LocalAcl.from_xml(obj.name, xml)

    def make_local_copy(self, obj):
        return LocalCopy(obj.name, obj.local_path(self.base), False)

    def all_objects(self):
        if os.path.isdir(self.base):
            return FileStoreIterator(self.base, self.follow_symlinks)
        else:
            return []

    def locate_object(self, obj):
        path = obj.local_path(self.base)
        if obj.is_folder:
            found = os.path.isdir(path)
            if found:
                return Object.from_folder(obj.name, path)
            else:
                return None
        else:
            found = os.path.isfile(path)
            logger.debug("FileStore::locate_object: %s object '%s'",
                         "found" if found else "did not find", obj.name)
            if (not found):
                return None
            return Object.from_file(obj.name, path)

    def upload(self, src, obj):
        if self.dry_run:
            return

        logger.debug(u"FileStore: upload obj '{}'".format(obj.name))

        lname = obj.local_name()
        d = os.path.join(self.base, lname).encode("UTF-8")
        mkdir_p(os.path.dirname(d))

        if obj.is_folder:
            d = os.path.join(self.base, obj.name).encode("UTF-8")
            mkdir_p(d)
        else:
            if obj.size <= MULTIPART_THRESH:
                ioctx = src.get_obj_ioctx(obj)
                with open(d, "wb") as f:
                    f.write(ioctx.read())
            else:
                try:
                    break_point = 0
                    offset = 0
                    if os.path.isfile(d):
                        try:
                            value = xattr.get(d, BREAK_POINT_META, namespace=xattr.NS_USER)
                            break_point = int(value)
                            logger.info('break point for {} is {}'.format(d, break_point))
                        except Exception as e:
                            logger.error(u"failed to get break_point xatr for {} {}".format(d,str(3)))
                            pass

                    if break_point % MULTIPART_THRESH != 0 or break_point > obj.size:
                        logger.info('break point for {} is {}'.format(d, break_point))
                        break_point = 0;

                    remaining = obj.size - break_point
                    part_num = break_point / MULTIPART_THRESH
                    part_size = MULTIPART_THRESH

                    seek_first = False
                    if break_point != 0:
                        seek_first = True
                        open_option = "r+b"
                    else:
                        open_option = "wb"

                    with open(d, open_option) as f:
                        while remaining > 0:
                            offset = part_num * part_size
                            length = min(remaining, part_size)
                            ioctx = src.get_obj_ioctx(obj, offset, length)
                            if seek_first:
                                f.seek(offset)
                                seek_first = False
                            f.write(ioctx.read())
                            remaining -= length
                            part_num += 1

                    if break_point != 0:
                        f = open(d, 'r')
                        try:
                            md5 = get_md5(f)
                        finally:
                            f.close()
                        if md5 != obj.md5:
                            logger.info("file md5 {} is not equal to {}".format(md5, obj.md5))
                            remaining = break_point
                            part_num = 0
                            part_size = MULTIPART_THRESH

                            with open(d, open_option) as f:
                                while remaining > 0:
                                    offset = part_num * part_size
                                    length = min(remaining, part_size)
                                    ioctx = src.get_obj_ioctx(obj, offset, length)
                                    f.write(ioctx.read())
                                    remaining -= length
                                    part_num += 1
                    try:
                        xattr.remove(d, BREAK_POINT_META, namespace=xattr.NS_USER)
                    except Exception as e:
                        pass

                except Exception as e:
                    if break_point == 0 and offset >= 10* MULTIPART_THRESH:
                        xattr.set(d, BREAK_POINT_META, str(int(offset)),namespace=xattr.NS_USER)
                        logger.error('failed to upload ,set break point to {}'.format(offset))
                    raise e

        # Store metadata in extended attributes
        file_set_md5(d, obj.md5)
        for k, v in obj.meta.items():
            xattr.set(d, k, v, namespace=xattr.NS_USER)

    def set_acl(self, src_acl, obj):
        if not obj.is_folder:
            lname = obj.local_name()
            d = os.path.join(self.base, lname).encode("UTF-8")
            mkdir_p(os.path.dirname(d))
            src_acl.write_to_xattr(d)

    def remove(self, obj):
        if self.dry_run:
            return
        path = os.path.join(self.base, obj.name.encode("UTF-8"))
        try:
            if os.path.isfile(path):
                os.unlink(path)
            if os.path.isdir(path):
                shutil.rmtree(path)
        except Exception:
            logger.warning("FileStore: remove %s failed", obj.name)

        logger.debug("FileStore: removed %s", obj.name)

    def clear(self):
        for entry in os.listdir(self.base):
            path = os.path.join(self.base, entry)
            if os.path.isfile(path):
                os.unlink(path)
            elif os.path.isdir(path):
                shutil.rmtree(path)

    def get_obj_ioctx(self, obj, offset=None, length=None):
        if obj.is_folder:
            return StringIO("")
        else:
            path = obj.local_path(self.base)
            found = os.path.isfile(path)
            if not found:
                return None
            with open(path, "rb") as f:
                if offset is not None:
                    f.seek(offset)
                if length is not None:
                    return StringIO(f.read(length))
                else:
                    return StringIO(f.read())


文章作者: Gavin Wang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Gavin Wang !
  目录