玩命加载中 . . .

Dead lock test


Scripts

file_lock_test.py

import fcntl
import os, time
import logging


logger = logging.getLogger()
hdlr = logging.FileHandler(logfile)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)


FILE = "lockf.txt"

if not os.path.exists(FILE):
    file = open(FILE, "a+")
    file.write("0")
    file.close()

for i in xrange(50):
    file = open(FILE, "r+")
    start_time = time.time()
    fcntl.lockf(file.fileno(), fcntl.LOCK_EX)
    # fcntl.flock(file.fileno(), fcntl.LOCK_EX)
    
    counter = int(file.readline()) + 1
    file.seek(0)
    file.write(str(counter))
    
    end_time = time.time()
    
    print 'used time: ' + str(end_time - start_time), '\n'
    logger.info(u'used time: {}'.format(str(end_time - start_time)))
        
    # unlocks the file
    file.close()
    
    # print os.getpid(), "=>", counter

lock_test.py

import time
import random
import logging

from ezs3.log import EZLog
from ezs3.lock import RESOURCE, SingleLock

logger = EZLog.get_logger(__name__)
EZLog.init_handler(logging.INFO, "./lock_test.log")

resource_list = [RESOURCE.FS_MANAGER, RESOURCE.VS_MANAGER, RESOURCE.MDS_HOST_PRIOR, RESOURCE.SHARED_FOLDER_MGR, RESOURCE.EXPORT_LOGS, RESOURCE.CENTRAL_FLUSH_LOCK, RESOURCE.POOL, RESOURCE.FEATURE_NODES]
sum = 0.0 
min = 0.0 
max = 0.0 
loop = 0 
fail_cnt = 0
for i in xrange(200):
    random.shuffle(resource_list) 
    for each_resource in resource_list:
        try:
            begin = time.time()
            with SingleLock(resource=each_resource, auto_renew=True):
                interval = time.time() - begin
                if min == 0.0 or min > interval:
                    min = interval
                if interval > max :
                    max = interval
                sum += interval
                loop += 1
                logger.info("avg {} min {} max {} loop = {}".format(sum/loop, min, max, loop))
                print("avg {} min {} max {} loop = {}".format(sum/loop, min, max, loop))
                random_sleep = random.randint(1,3)
                time.sleep(random_sleep)
                logger.info("[Success] (%s) time(s) to get and release lock : (%s), random sleep ; (%s)", i+1, each_resource, random_sleep)
        except Exception as ex:
            fail_cnt += 1
            interval = time.time() - begin 
            print("cost time {} and FAILURE,  failed cnt {} ".format(interval, fail_cnt))
            logger.info("cost time {} and FAILURE,  failed cnt {} ".format(interval, fail_cnt))
            time.sleep(1)


if __name__ == '__main__':
    pass

root@scaler80:/usr/lib/python2.7/dist-packages/ezs3# cat log.py
import os
import sys
import logging
import threading
from logging.handlers import SysLogHandler, WatchedFileHandler
from logging import StreamHandler
from contextlib import contextmanager

from ezs3.distro import get_syslog_file_name_by_distro

logging.raiseExceptions = False


class SysLogFile:
    DEFAULT = (SysLogHandler.LOG_USER, "/var/log/{}".format(get_syslog_file_name_by_distro()))
    EZS3_ADMIN = (SysLogHandler.LOG_LOCAL0, "/var/log/ezcloudstor/ezs3admin.log")
    EZS3_GMOND = (SysLogHandler.LOG_LOCAL2, "/var/log/ezcloudstor/ezgmond.log")
    EZS3_CENTRAL = (SysLogHandler.LOG_LOCAL3, "/var/log/ezcloudstor/ezcentral.cache")
    BTSNMP = (SysLogHandler.LOG_LOCAL4, "/var/log/ezcloudstor/btsnmp.log")


# ref: http://www.electricmonk.nl/log/2011/08/14/redirect-stdout-and-stderr-to-a-logger-in-python/
class StreamToLogger(object):
    """
    Fake file-like stream object that redirects writes to a logger instance.
    """
    def __init__(self, fno, logger, log_level=logging.INFO):
        self.logger = logger
        self.log_level = log_level
        self.linebuf = ''
        self.fno = fno

    def write(self, buf):
        for line in buf.rstrip().splitlines():
            self.logger.log(self.log_level, line.rstrip())

    def flush(self):
        # StreamToLogger flushes every write so this function should be no-op
        pass

    def fileno(self):
        return self.fno


class EZLog:
    _handler = None
    _modules = []  # modules with uninitialized loggers
    _initial_level = logging.INFO
    _debug_ref_cnt = 0
    _debug_enable_lock = threading.Lock()


    @classmethod
    def init_handler(cls, level=logging.INFO, logfile=None, syslog=False,
                     syslogfile=SysLogFile.DEFAULT, redirect_stderr=False,
                     fmt=None):
        if cls._handler:
            raise Exception("EZLog handler initailized twice")

        if not fmt:
            fmt = "[%(asctime)s] [%(levelname)s] [%(process)d] [%(name)s] " \
                    "[%(funcName)s:%(lineno)d] %(message)s"
        if syslog:
            sys_formatter = logging.Formatter(fmt)
            cls._handler = SysLogHandler(address='/dev/log', facility=syslogfile[0])
            cls._handler.setFormatter(sys_formatter)
        elif logfile:
            dirpath = os.path.dirname(logfile)
            if not os.path.exists(dirpath):
                os.makedirs(dirpath)
            file_formatter = logging.Formatter(fmt)
            cls._handler = WatchedFileHandler(logfile, encoding='UTF-8')
            cls._handler.setFormatter(file_formatter)
        else:
            cls._handler = StreamHandler(sys.stdout)
            cls._handler.setFormatter(logging.Formatter('%(message)s'))

        if os.environ.get("EZDEBUG"):
            level = logging.DEBUG

        cls._handler.setLevel(level)
        cls._initial_level = level
        for module in cls._modules:
            logger = logging.getLogger(module)
            if cls._handler not in logger.handlers:
                logger.addHandler(cls._handler)

        if redirect_stderr:
            logger = EZLog.get_logger('stderr')
            sys.stderr = StreamToLogger(sys.stderr.fileno(), logger, logging.ERROR)

    @classmethod
    def get_logger(cls, module):
        logger = logging.getLogger(module)
        logger.setLevel(logging.DEBUG)

        if cls._handler is None:
            cls._modules.append(module)
        elif cls._handler not in logger.handlers:
            logger.addHandler(cls._handler)

        return logger

    @classmethod
    def setLevel(cls, level):
        cls._initial_level = level
        cls._handler.setLevel(level)

    @classmethod
    def getLevel(cls):
        return cls._handler.level

    @classmethod
    def setTmpLevel(cls, level):
        cls._handler.setLevel(level)

    @classmethod
    def getInitialLevel(cls):
        return cls._initial_level


# context helper to enable debug logging in a code block
@contextmanager
def enable_debug_log():
    with EZLog._debug_enable_lock:
        EZLog._debug_ref_cnt += 1

    old_level = EZLog.getInitialLevel()
    EZLog.setTmpLevel(logging.DEBUG)
    try:
        yield
    finally:
        with EZLog._debug_enable_lock:
            EZLog._debug_ref_cnt -= 1
            if EZLog._debug_ref_cnt == 0:
                EZLog.setTmpLevel(old_level)

root@scaler80:/usr/lib/python2.7/dist-packages/ezs3# cat lock.py
import errno
import fcntl
import json
import os
import time
import signal
import Queue
import multiprocessing
from ezs3.command import do_cmd
from ezs3.log import EZLog
from functools import wraps
from threading import Lock, Event, Thread
from ezs3.ezrados import SafeRados


logger = EZLog.get_logger(__name__)


class LOCK_TYPE:
    SHARED = "shared"
    EXCLUSIVE = "exclusive"


class RESOURCE(object):
    FS_PROGRESS = 'fs_progress'
    FS_MANAGER = 'fs_manager'
    FS_INSTANCE = 'fs_instance'
    MDS_INSTANCE = 'mds_instance'
    EVENT_CACHE = 'event_cache'
    CTDB = 'ctdb'
    SDS_CONTROLLERS = 'sds_controllers'
    FEATURE_NODES = 'feature_nodes'
    EZZERO_COPY = 'ezzero_copy'
    EZDEEP_COPY = 'ezdeep_copy'
    EZ_NAS_TO_S3 = 'ez_nas_to_s3'
    EZ_S3_TO_NAS = 'ez_s3_to_nas'
    EZFS_AGENT = 'ezfs_agent'
    SMB_MANAGEMENT = 'samba_management'
    EXPORT_LOGS = 'export_logs'
    NODE_STATUS = 'node_status'
    POOL = 'pool.{}'
    NODE_MANAGEMENT = 'node_management'
    CENTRAL_CONFIG_LOCK = 'central.config'
    CENTRAL_FLUSH_LOCK = 'central.flush'
    VS_MANAGER = 'vs_manager'   # protect "gateway_groups" config key
    MDS_HOST_PRIOR = 'mds_host_prior'
    DISK_RECOVERY = 'disk_recovery'
    RGW_MANAGER = 'rgw_manager'
    CRUSH_REWEIGHT = "crush_reweight"
    CEPH_CRUSHMAP = "ceph_crushmap"
    HARDWARE_INFO = 'hardware_info'
    POOLS_INFO = 'POOLS_INFO'
    USER_MAPPING_MANAGEMENT = 'user_mapping_management'
    SDS_ADMIN_ACCOUNT = 'sds_admin_account'
    SNS_CONFIG = 'sns_config'
    SQS_CONFIG = 'sqs_config'
    S3NOTIF_CONFIG = 's3notif_config'
    STORCLS_SERVICE = 'storage_class_service'
    IMPORT_ACCOUNT_CONFIG = 'import_account_config'
    MIGRATE_CEPH_ACCOUNTS = 'migrate_ceph_accounts'
    MIGRATE_JOB = 'migrate_job'
    MIGRATE_INDEX_JOB = 'migrate_index_job'
    MIGRATE_META_JOB = 'migrate_meta_job'
    MIGRATE_DATA_JOB = 'migrate_data_job'
    MIGRATE_QUOTA_JOB = 'migrate_quota_job'
    MIGRATE_FAILUE_JOB = 'migrate_failue_job'
    MIGRATE_DOMAIN = 'migrate_domain'
    IPMI_CONFIG = 'ipmi_config'
    RAID_STATE = 'raid_state'
    LICENSE = 'license'
    ISCSI_MANAGER = 'iscsi_manager'
    OSD_ID_ASSIGN = 'osd_id_assign'
    SYSTEM_UID = 'system_uid'
    ALLOW_POOL_DELETE = 'allow_pool_delete'
    PUBLIC_IPS = 'public_ips'


class FILE_RESOURCE(object):
    HOSTS = "hosts"
    STORAGE_CONF_WRITE = 'storage_conf_write'
    STORAGE_CONF_READ = 'storage_conf_read'
    LSBLK = 'lsblk'
    NAS_DISK_CONF = 'nas_disk_conf'
    ISCSI_SESSIONS = 'iscsi_sessions'
    CEPH_CONF = 'ceph_conf'
    AD = 'ad'


def rbd_task_lock_name(pool, img):
    return "rbd_task.{}.{}".format(pool, img)


file_resource_thread_locks = {}


def init_file_resource_thread_locks():
    global file_resource_thread_locks
    file_resources = [r for r in dir(FILE_RESOURCE) if not r.startswith('__')]
    file_resource_thread_locks = {
        FILE_RESOURCE.__dict__[r]: Lock() for r in file_resources}

init_file_resource_thread_locks()


def list_locked_resources():
    output = do_cmd('ceph lock list')
    return json.loads(output)


def delete_resource_lock(resource):
    do_cmd('ceph lock delete {}'.format(resource))


class MonitorLockError(RuntimeError):
    def __str__(self):
        return "CephMonitorLockError: {}".format(self.message)


class SingleLock(object):
    def __init__(self, resource, lock_type=LOCK_TYPE.EXCLUSIVE,
                 auto_renew=False):
        self.resource = resource
        self.lock_type = lock_type
        self.timestamp = ''

        self._auto_renew = auto_renew   # default config
        self._enable_auto_renew = False # state of current lock
        self._autorenew_thread = None
        self._stop_renew = Event()
        self._renew_interval = 10
        self._rds = SafeRados.get_instance()

    def acquire(self, blocking=True, auto_renew=None, renew_interval=10):
        self._enable_auto_renew = (auto_renew == True) or \
            (auto_renew == None and self._auto_renew)
        json_cmd = {'prefix': 'lock acquire', 'type': self.lock_type, 'key': self.resource}
        acquire_retry_cnt = 0
        while True:
            ret, outbuf, outs = self._rds.json_command(argdict=json_cmd,
                                                       timeout=30, do_retry=False)
            if ret == 0:
                self.timestamp = outbuf
                if self._enable_auto_renew:
                    logger.debug('Got lock {} and auto renew is enabled, timestamp={}'
                                 .format(self.resource, self.timestamp))
                    self._stop_renew.clear()
                    self._renew_interval = renew_interval
                    self._autorenew_thread = Thread(target=self.renew_thread)
                    self._autorenew_thread.start()
                else:
                    logger.debug('Got lock and auto renew is disabled')
                return True
            elif ret == -errno.EBUSY or ret == -errno.EAGAIN:
                acquire_retry_cnt += 1
                if ret == -errno.EBUSY:
                    message = 'Lock {} is occupied ({})...'.format(self.resource, acquire_retry_cnt)
                else:
                    message = 'Lock {} acquire cannot exec, may not in quorum ({}) ...'.format(self.resource, acquire_retry_cnt)
                if acquire_retry_cnt in [5,10,20,50,100,300]:
                    logger.warn(message)
                elif acquire_retry_cnt > 300:
                    logger.error(message)
                else:
                    logger.debug(message)
                if not blocking:
                    return False
                time.sleep(1)
            else:
                raise MonitorLockError('Monitor returned: {}({})'.format(errno.errorcode[-ret], ret))

    def release(self):
        if not self.timestamp:
            return

        if self._enable_auto_renew:
            self._stop_renew.set()
            self._autorenew_thread.join()

        json_cmd = {'prefix': 'lock release', 'ts': self.timestamp, 'key': self.resource}
        ret, _outbuf, _outs = self._rds.json_command(argdict=json_cmd,
                                                     timeout=30, do_retry=False)

        if ret == 0:
            self.timestamp = ''
            return True
        elif ret == -errno.ENOENT:
            logger.warning('lock {} release with no entry, ret={}, timestemp={}'.format(self.resource, ret, self.timestamp))
        else:
            logger.error('lock {} release failed, ret={}, timestemp={}'.format(self.resource, ret, self.timestamp))
        return False

    def renew(self):
        logger.debug("renew monitor lock {} with timestamp={}".format(self.resource, self.timestamp))
        json_cmd = {'prefix': 'lock renew', 'ts': self.timestamp, 'key': self.resource}
        ret, outbuf, _ = self._rds.json_command(argdict=json_cmd, timeout=30, do_retry=False)
        if ret == 0:
            self.timestamp = outbuf
            return True
        else:
            logger.error('lock {} renew failed, ret={}, timestemp={}'.format(
                self.resource, ret, self.timestamp))
            return False

    def renew_thread(self):
        while not self._stop_renew.wait(self._renew_interval):
            self.renew()

    def __enter__(self):
        blocking = True
        self.acquire(blocking, self._auto_renew)
        return self

    def __exit__(self, type, value, traceback):
        self.release()


class MultiLock(object):
    def __init__(self, resources, lock_type=LOCK_TYPE.EXCLUSIVE, auto_renew=False):
        self.resources = resources
        self.lock_type = lock_type
        self.auto_renew = auto_renew
        self.locks = []

    def acquire(self):
        for resource in self.resources:
            lock = SingleLock(resource, self.lock_type, self.auto_renew)
            self.locks.append(lock)

        for lock in self.locks:
            lock.acquire()

    def release(self):
        for lock in self.locks:
            lock.release()

    def renew(self):
        for lock in self.locks:
            lock.renew()

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, traceback):
        self.release()


class GroupLock(SingleLock):
    def __init__(self, gwgroup, resource, lock_type=LOCK_TYPE.EXCLUSIVE,
                 auto_renew=False):
        SingleLock.__init__(self, '{}.{}'.format(gwgroup, resource), lock_type,
                            auto_renew)


class ProcessLockTimeout(RuntimeError):
    pass


class ProcessLockFailed(RuntimeError):
    pass


# The process lock is implemented by flock syscall, and it can only be interrupted by SIGALRM.
# In order to allow background threads having their own process lock with timeout ability,
# we implement a worker process for every ProcessLock to do flock operations (if timeout is specified).
class ProcessLock:
    def __init__(self, resource, lock_type=LOCK_TYPE.EXCLUSIVE, blocking=True, timeout=0):
        self.resource = resource
        self.lock_type = lock_type
        self.blocking = blocking
        self.timeout = timeout
        self.flock_result = None
        self.release_event = None
        self.worker = None
        self.process_lock = None

    def acquire(self):
        # currently we forbid re-entrant lock to simplify the cases
        if self.process_lock:
            raise ProcessLockFailed('Unable to re-acquire lock {}'.format(self.resource))

        self.process_lock = _ProcessLock(self.resource, self.lock_type, self.blocking)

        if self.timeout > 0:
            self.flock_result = multiprocessing.Queue(1)
            self.release_event = multiprocessing.Event()
            self.worker = ProcessLockWorker(self.flock_result,
                                            self.release_event,
                                            self.process_lock)
            self.worker.start()
            try:
                res = self.flock_result.get(timeout=self.timeout)
                if res[0] < 0:
                    self.process_lock = None
                    raise ProcessLockFailed('Unable to lock {}, err={}'.format(self.resource, res[1]))
            except Queue.Empty:
                self.process_lock = None
                # interrupt blocking flock syscall in worker
                os.kill(self.worker.pid, signal.SIGALRM)
                while self.worker.is_alive():
                    self.worker.terminate()
                raise ProcessLockTimeout('Timeout to lock {}'.format(self.resource))
        else:
            try:
                self.process_lock.acquire()
            except Exception as e:
                self.process_lock = None
                logger.exception("Acquire lock failed..")
                raise ProcessLockFailed('Unable to lock {}, err={}'.format(self.resource, e.message))

    def release(self):
        if self.process_lock:
            if self.worker:
                self.release_event.set()
                self.worker.join()
                self.worker = None
            else:
                self.process_lock.release()
            self.process_lock = None

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, type, value, traceback):
        self.release()


class _ProcessLock:
    FILE_LOCK_PATH = '/var/lock/ezs3'

    def __init__(self, resource, lock_type, blocking):
        self.resource = resource
        self.lock_type = lock_type
        self.blocking = blocking
        self.handle = None
        self.thread_lock = file_resource_thread_locks[self.resource]

    def acquire(self):
        self.thread_lock.acquire()
        if not os.path.exists(_ProcessLock.FILE_LOCK_PATH):
            os.mkdir(_ProcessLock.FILE_LOCK_PATH)

        filename = os.path.join(_ProcessLock.FILE_LOCK_PATH, self.resource)
        self.handle = open(filename, 'w')

        lock_op = 0

        if self.lock_type == LOCK_TYPE.EXCLUSIVE:
            lock_op = fcntl.LOCK_EX
        else:
            lock_op = fcntl.LOCK_SH

        if self.blocking:
            fcntl.flock(self.handle, lock_op)
        else:
            fcntl.flock(self.handle, lock_op | fcntl.LOCK_NB)

    def release(self):
        if self.handle:
            fcntl.flock(self.handle, fcntl.LOCK_UN)
            self.handle.close()
            self.handle = None
        if self.thread_lock.locked():
            self.thread_lock.release()


# NOTE: do not logging in ProcessLockWorker because logging in
# another process without syslog handler might corrupt the log.
class ProcessLockWorker(multiprocessing.Process):
    def __init__(self, flock_result, release_event, process_lock):
        multiprocessing.Process.__init__(self)
        self.flock_result = flock_result
        self.release_event = release_event
        self.process_lock = process_lock

    def run(self):
        try:
            self.process_lock.acquire()
            self.flock_result.put([0, ''])
            self.release_event.wait()
            self.process_lock.release()
        except Exception as e:
            self.flock_result.put([-1, e.message])


internal_lock = {}


def get_internal_lock(lock_name):
    global internal_lock
    if lock_name not in internal_lock:
        internal_lock[lock_name] = Lock()
    return internal_lock[lock_name]


external_lock = {}


def get_external_lock(lock_name, rds=None, timeout=60):
    global external_lock
    if lock_name not in external_lock or external_lock[lock_name] is None:
        try:
            rl = SingleLock(lock_name)
        except:
            rl = None
            logger.warn('Faile to create monitor lock')
        external_lock[lock_name] = rl
    return external_lock[lock_name]


def monlock(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        lock_name = kwargs.get('_lock_name')
        if not lock_name:
            lock_name = func.__module__
        else:
            del kwargs['_lock_name']

        ret = None
        try:
            # Honestly, we should only using rados lock here, but due to the memory leak issue of rados lock, we added a internal lock so one process can share one rados lock.
            in_lock = get_internal_lock(lock_name)
            ex_lock = get_external_lock(lock_name)
            in_lock.acquire()
            if ex_lock:
                ex_lock.acquire(auto_renew=True)
            ret = func(*args, **kwargs)
        except Exception, ex:
            logger.error('Running wrapped function {} error: {}'.format(str(func), str(ex)))
            raise ex
        finally:
            if ex_lock:
                ex_lock.release()
            if in_lock:
                in_lock.release()

        return ret
    return wrapper

class MonLockWithName(object):
    def __init__(self, lock_name):
        self.lock_name = lock_name

    def __call__(self, func):
        def wrapped_f(*args):
            ret = None
            try:
                in_lock = get_internal_lock(self.lock_name)
                ex_lock = get_external_lock(self.lock_name)
                in_lock.acquire()
                if ex_lock:
                    ex_lock.acquire(auto_renew=True)
                ret = func(*args)
            except Exception, ex:
                logger.error('Running wrapped function {} error: {}'.format(str(func), str(ex)))
                raise ex
            finally:
                if ex_lock:
                    ex_lock.release()
                if in_lock:
                    in_lock.release()

            return ret
        return wrapped_f


class RWLock(object):
    class _Switch(object):
        def __init__(self):
            self._count = 0
            self._mutex = Lock()

        def acquire(self, lock):
            self._mutex.acquire()
            self._count += 1
            if self._count == 1:
                lock.acquire()
            self._mutex.release()

        def release(self, lock):
            self._mutex.acquire()
            self._count -= 1
            if self._count == 0:
                lock.release()
            self._mutex.release()

    def __init__(self):
        self._read_switch = self._Switch()
        self._write_switch = self._Switch()
        self._no_readers = Lock()
        self._no_writers = Lock()
        self._readers_queue = Lock()

    def read_acquire(self):
        self._readers_queue.acquire()
        self._no_readers.acquire()
        self._read_switch.acquire(self._no_writers)
        self._no_readers.release()
        self._readers_queue.release()

    def read_release(self):
        self._read_switch.release(self._no_writers)

    def write_acquire(self):
        self._write_switch.acquire(self._no_readers)
        self._no_writers.acquire()

    def write_release(self):
        self._no_writers.release()
        self._write_switch.release(self._no_readers)

文章作者: Gavin Wang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Gavin Wang !
  目录