Scripts
file_lock_test.py
import fcntl
import os, time
import logging
logger = logging.getLogger()
hdlr = logging.FileHandler(logfile)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.INFO)
FILE = "lockf.txt"
if not os.path.exists(FILE):
file = open(FILE, "a+")
file.write("0")
file.close()
for i in xrange(50):
file = open(FILE, "r+")
start_time = time.time()
fcntl.lockf(file.fileno(), fcntl.LOCK_EX)
# fcntl.flock(file.fileno(), fcntl.LOCK_EX)
counter = int(file.readline()) + 1
file.seek(0)
file.write(str(counter))
end_time = time.time()
print 'used time: ' + str(end_time - start_time), '\n'
logger.info(u'used time: {}'.format(str(end_time - start_time)))
# unlocks the file
file.close()
# print os.getpid(), "=>", counter
lock_test.py
import time
import random
import logging
from ezs3.log import EZLog
from ezs3.lock import RESOURCE, SingleLock
logger = EZLog.get_logger(__name__)
EZLog.init_handler(logging.INFO, "./lock_test.log")
resource_list = [RESOURCE.FS_MANAGER, RESOURCE.VS_MANAGER, RESOURCE.MDS_HOST_PRIOR, RESOURCE.SHARED_FOLDER_MGR, RESOURCE.EXPORT_LOGS, RESOURCE.CENTRAL_FLUSH_LOCK, RESOURCE.POOL, RESOURCE.FEATURE_NODES]
sum = 0.0
min = 0.0
max = 0.0
loop = 0
fail_cnt = 0
for i in xrange(200):
random.shuffle(resource_list)
for each_resource in resource_list:
try:
begin = time.time()
with SingleLock(resource=each_resource, auto_renew=True):
interval = time.time() - begin
if min == 0.0 or min > interval:
min = interval
if interval > max :
max = interval
sum += interval
loop += 1
logger.info("avg {} min {} max {} loop = {}".format(sum/loop, min, max, loop))
print("avg {} min {} max {} loop = {}".format(sum/loop, min, max, loop))
random_sleep = random.randint(1,3)
time.sleep(random_sleep)
logger.info("[Success] (%s) time(s) to get and release lock : (%s), random sleep ; (%s)", i+1, each_resource, random_sleep)
except Exception as ex:
fail_cnt += 1
interval = time.time() - begin
print("cost time {} and FAILURE, failed cnt {} ".format(interval, fail_cnt))
logger.info("cost time {} and FAILURE, failed cnt {} ".format(interval, fail_cnt))
time.sleep(1)
if __name__ == '__main__':
pass
root@scaler80:/usr/lib/python2.7/dist-packages/ezs3# cat log.py
import os
import sys
import logging
import threading
from logging.handlers import SysLogHandler, WatchedFileHandler
from logging import StreamHandler
from contextlib import contextmanager
from ezs3.distro import get_syslog_file_name_by_distro
logging.raiseExceptions = False
class SysLogFile:
DEFAULT = (SysLogHandler.LOG_USER, "/var/log/{}".format(get_syslog_file_name_by_distro()))
EZS3_ADMIN = (SysLogHandler.LOG_LOCAL0, "/var/log/ezcloudstor/ezs3admin.log")
EZS3_GMOND = (SysLogHandler.LOG_LOCAL2, "/var/log/ezcloudstor/ezgmond.log")
EZS3_CENTRAL = (SysLogHandler.LOG_LOCAL3, "/var/log/ezcloudstor/ezcentral.cache")
BTSNMP = (SysLogHandler.LOG_LOCAL4, "/var/log/ezcloudstor/btsnmp.log")
# ref: http://www.electricmonk.nl/log/2011/08/14/redirect-stdout-and-stderr-to-a-logger-in-python/
class StreamToLogger(object):
"""
Fake file-like stream object that redirects writes to a logger instance.
"""
def __init__(self, fno, logger, log_level=logging.INFO):
self.logger = logger
self.log_level = log_level
self.linebuf = ''
self.fno = fno
def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
def flush(self):
# StreamToLogger flushes every write so this function should be no-op
pass
def fileno(self):
return self.fno
class EZLog:
_handler = None
_modules = [] # modules with uninitialized loggers
_initial_level = logging.INFO
_debug_ref_cnt = 0
_debug_enable_lock = threading.Lock()
@classmethod
def init_handler(cls, level=logging.INFO, logfile=None, syslog=False,
syslogfile=SysLogFile.DEFAULT, redirect_stderr=False,
fmt=None):
if cls._handler:
raise Exception("EZLog handler initailized twice")
if not fmt:
fmt = "[%(asctime)s] [%(levelname)s] [%(process)d] [%(name)s] " \
"[%(funcName)s:%(lineno)d] %(message)s"
if syslog:
sys_formatter = logging.Formatter(fmt)
cls._handler = SysLogHandler(address='/dev/log', facility=syslogfile[0])
cls._handler.setFormatter(sys_formatter)
elif logfile:
dirpath = os.path.dirname(logfile)
if not os.path.exists(dirpath):
os.makedirs(dirpath)
file_formatter = logging.Formatter(fmt)
cls._handler = WatchedFileHandler(logfile, encoding='UTF-8')
cls._handler.setFormatter(file_formatter)
else:
cls._handler = StreamHandler(sys.stdout)
cls._handler.setFormatter(logging.Formatter('%(message)s'))
if os.environ.get("EZDEBUG"):
level = logging.DEBUG
cls._handler.setLevel(level)
cls._initial_level = level
for module in cls._modules:
logger = logging.getLogger(module)
if cls._handler not in logger.handlers:
logger.addHandler(cls._handler)
if redirect_stderr:
logger = EZLog.get_logger('stderr')
sys.stderr = StreamToLogger(sys.stderr.fileno(), logger, logging.ERROR)
@classmethod
def get_logger(cls, module):
logger = logging.getLogger(module)
logger.setLevel(logging.DEBUG)
if cls._handler is None:
cls._modules.append(module)
elif cls._handler not in logger.handlers:
logger.addHandler(cls._handler)
return logger
@classmethod
def setLevel(cls, level):
cls._initial_level = level
cls._handler.setLevel(level)
@classmethod
def getLevel(cls):
return cls._handler.level
@classmethod
def setTmpLevel(cls, level):
cls._handler.setLevel(level)
@classmethod
def getInitialLevel(cls):
return cls._initial_level
# context helper to enable debug logging in a code block
@contextmanager
def enable_debug_log():
with EZLog._debug_enable_lock:
EZLog._debug_ref_cnt += 1
old_level = EZLog.getInitialLevel()
EZLog.setTmpLevel(logging.DEBUG)
try:
yield
finally:
with EZLog._debug_enable_lock:
EZLog._debug_ref_cnt -= 1
if EZLog._debug_ref_cnt == 0:
EZLog.setTmpLevel(old_level)
root@scaler80:/usr/lib/python2.7/dist-packages/ezs3# cat lock.py
import errno
import fcntl
import json
import os
import time
import signal
import Queue
import multiprocessing
from ezs3.command import do_cmd
from ezs3.log import EZLog
from functools import wraps
from threading import Lock, Event, Thread
from ezs3.ezrados import SafeRados
logger = EZLog.get_logger(__name__)
class LOCK_TYPE:
SHARED = "shared"
EXCLUSIVE = "exclusive"
class RESOURCE(object):
FS_PROGRESS = 'fs_progress'
FS_MANAGER = 'fs_manager'
FS_INSTANCE = 'fs_instance'
MDS_INSTANCE = 'mds_instance'
EVENT_CACHE = 'event_cache'
CTDB = 'ctdb'
SDS_CONTROLLERS = 'sds_controllers'
FEATURE_NODES = 'feature_nodes'
EZZERO_COPY = 'ezzero_copy'
EZDEEP_COPY = 'ezdeep_copy'
EZ_NAS_TO_S3 = 'ez_nas_to_s3'
EZ_S3_TO_NAS = 'ez_s3_to_nas'
EZFS_AGENT = 'ezfs_agent'
SMB_MANAGEMENT = 'samba_management'
EXPORT_LOGS = 'export_logs'
NODE_STATUS = 'node_status'
POOL = 'pool.{}'
NODE_MANAGEMENT = 'node_management'
CENTRAL_CONFIG_LOCK = 'central.config'
CENTRAL_FLUSH_LOCK = 'central.flush'
VS_MANAGER = 'vs_manager' # protect "gateway_groups" config key
MDS_HOST_PRIOR = 'mds_host_prior'
DISK_RECOVERY = 'disk_recovery'
RGW_MANAGER = 'rgw_manager'
CRUSH_REWEIGHT = "crush_reweight"
CEPH_CRUSHMAP = "ceph_crushmap"
HARDWARE_INFO = 'hardware_info'
POOLS_INFO = 'POOLS_INFO'
USER_MAPPING_MANAGEMENT = 'user_mapping_management'
SDS_ADMIN_ACCOUNT = 'sds_admin_account'
SNS_CONFIG = 'sns_config'
SQS_CONFIG = 'sqs_config'
S3NOTIF_CONFIG = 's3notif_config'
STORCLS_SERVICE = 'storage_class_service'
IMPORT_ACCOUNT_CONFIG = 'import_account_config'
MIGRATE_CEPH_ACCOUNTS = 'migrate_ceph_accounts'
MIGRATE_JOB = 'migrate_job'
MIGRATE_INDEX_JOB = 'migrate_index_job'
MIGRATE_META_JOB = 'migrate_meta_job'
MIGRATE_DATA_JOB = 'migrate_data_job'
MIGRATE_QUOTA_JOB = 'migrate_quota_job'
MIGRATE_FAILUE_JOB = 'migrate_failue_job'
MIGRATE_DOMAIN = 'migrate_domain'
IPMI_CONFIG = 'ipmi_config'
RAID_STATE = 'raid_state'
LICENSE = 'license'
ISCSI_MANAGER = 'iscsi_manager'
OSD_ID_ASSIGN = 'osd_id_assign'
SYSTEM_UID = 'system_uid'
ALLOW_POOL_DELETE = 'allow_pool_delete'
PUBLIC_IPS = 'public_ips'
class FILE_RESOURCE(object):
HOSTS = "hosts"
STORAGE_CONF_WRITE = 'storage_conf_write'
STORAGE_CONF_READ = 'storage_conf_read'
LSBLK = 'lsblk'
NAS_DISK_CONF = 'nas_disk_conf'
ISCSI_SESSIONS = 'iscsi_sessions'
CEPH_CONF = 'ceph_conf'
AD = 'ad'
def rbd_task_lock_name(pool, img):
return "rbd_task.{}.{}".format(pool, img)
file_resource_thread_locks = {}
def init_file_resource_thread_locks():
global file_resource_thread_locks
file_resources = [r for r in dir(FILE_RESOURCE) if not r.startswith('__')]
file_resource_thread_locks = {
FILE_RESOURCE.__dict__[r]: Lock() for r in file_resources}
init_file_resource_thread_locks()
def list_locked_resources():
output = do_cmd('ceph lock list')
return json.loads(output)
def delete_resource_lock(resource):
do_cmd('ceph lock delete {}'.format(resource))
class MonitorLockError(RuntimeError):
def __str__(self):
return "CephMonitorLockError: {}".format(self.message)
class SingleLock(object):
def __init__(self, resource, lock_type=LOCK_TYPE.EXCLUSIVE,
auto_renew=False):
self.resource = resource
self.lock_type = lock_type
self.timestamp = ''
self._auto_renew = auto_renew # default config
self._enable_auto_renew = False # state of current lock
self._autorenew_thread = None
self._stop_renew = Event()
self._renew_interval = 10
self._rds = SafeRados.get_instance()
def acquire(self, blocking=True, auto_renew=None, renew_interval=10):
self._enable_auto_renew = (auto_renew == True) or \
(auto_renew == None and self._auto_renew)
json_cmd = {'prefix': 'lock acquire', 'type': self.lock_type, 'key': self.resource}
acquire_retry_cnt = 0
while True:
ret, outbuf, outs = self._rds.json_command(argdict=json_cmd,
timeout=30, do_retry=False)
if ret == 0:
self.timestamp = outbuf
if self._enable_auto_renew:
logger.debug('Got lock {} and auto renew is enabled, timestamp={}'
.format(self.resource, self.timestamp))
self._stop_renew.clear()
self._renew_interval = renew_interval
self._autorenew_thread = Thread(target=self.renew_thread)
self._autorenew_thread.start()
else:
logger.debug('Got lock and auto renew is disabled')
return True
elif ret == -errno.EBUSY or ret == -errno.EAGAIN:
acquire_retry_cnt += 1
if ret == -errno.EBUSY:
message = 'Lock {} is occupied ({})...'.format(self.resource, acquire_retry_cnt)
else:
message = 'Lock {} acquire cannot exec, may not in quorum ({}) ...'.format(self.resource, acquire_retry_cnt)
if acquire_retry_cnt in [5,10,20,50,100,300]:
logger.warn(message)
elif acquire_retry_cnt > 300:
logger.error(message)
else:
logger.debug(message)
if not blocking:
return False
time.sleep(1)
else:
raise MonitorLockError('Monitor returned: {}({})'.format(errno.errorcode[-ret], ret))
def release(self):
if not self.timestamp:
return
if self._enable_auto_renew:
self._stop_renew.set()
self._autorenew_thread.join()
json_cmd = {'prefix': 'lock release', 'ts': self.timestamp, 'key': self.resource}
ret, _outbuf, _outs = self._rds.json_command(argdict=json_cmd,
timeout=30, do_retry=False)
if ret == 0:
self.timestamp = ''
return True
elif ret == -errno.ENOENT:
logger.warning('lock {} release with no entry, ret={}, timestemp={}'.format(self.resource, ret, self.timestamp))
else:
logger.error('lock {} release failed, ret={}, timestemp={}'.format(self.resource, ret, self.timestamp))
return False
def renew(self):
logger.debug("renew monitor lock {} with timestamp={}".format(self.resource, self.timestamp))
json_cmd = {'prefix': 'lock renew', 'ts': self.timestamp, 'key': self.resource}
ret, outbuf, _ = self._rds.json_command(argdict=json_cmd, timeout=30, do_retry=False)
if ret == 0:
self.timestamp = outbuf
return True
else:
logger.error('lock {} renew failed, ret={}, timestemp={}'.format(
self.resource, ret, self.timestamp))
return False
def renew_thread(self):
while not self._stop_renew.wait(self._renew_interval):
self.renew()
def __enter__(self):
blocking = True
self.acquire(blocking, self._auto_renew)
return self
def __exit__(self, type, value, traceback):
self.release()
class MultiLock(object):
def __init__(self, resources, lock_type=LOCK_TYPE.EXCLUSIVE, auto_renew=False):
self.resources = resources
self.lock_type = lock_type
self.auto_renew = auto_renew
self.locks = []
def acquire(self):
for resource in self.resources:
lock = SingleLock(resource, self.lock_type, self.auto_renew)
self.locks.append(lock)
for lock in self.locks:
lock.acquire()
def release(self):
for lock in self.locks:
lock.release()
def renew(self):
for lock in self.locks:
lock.renew()
def __enter__(self):
self.acquire()
return self
def __exit__(self, type, value, traceback):
self.release()
class GroupLock(SingleLock):
def __init__(self, gwgroup, resource, lock_type=LOCK_TYPE.EXCLUSIVE,
auto_renew=False):
SingleLock.__init__(self, '{}.{}'.format(gwgroup, resource), lock_type,
auto_renew)
class ProcessLockTimeout(RuntimeError):
pass
class ProcessLockFailed(RuntimeError):
pass
# The process lock is implemented by flock syscall, and it can only be interrupted by SIGALRM.
# In order to allow background threads having their own process lock with timeout ability,
# we implement a worker process for every ProcessLock to do flock operations (if timeout is specified).
class ProcessLock:
def __init__(self, resource, lock_type=LOCK_TYPE.EXCLUSIVE, blocking=True, timeout=0):
self.resource = resource
self.lock_type = lock_type
self.blocking = blocking
self.timeout = timeout
self.flock_result = None
self.release_event = None
self.worker = None
self.process_lock = None
def acquire(self):
# currently we forbid re-entrant lock to simplify the cases
if self.process_lock:
raise ProcessLockFailed('Unable to re-acquire lock {}'.format(self.resource))
self.process_lock = _ProcessLock(self.resource, self.lock_type, self.blocking)
if self.timeout > 0:
self.flock_result = multiprocessing.Queue(1)
self.release_event = multiprocessing.Event()
self.worker = ProcessLockWorker(self.flock_result,
self.release_event,
self.process_lock)
self.worker.start()
try:
res = self.flock_result.get(timeout=self.timeout)
if res[0] < 0:
self.process_lock = None
raise ProcessLockFailed('Unable to lock {}, err={}'.format(self.resource, res[1]))
except Queue.Empty:
self.process_lock = None
# interrupt blocking flock syscall in worker
os.kill(self.worker.pid, signal.SIGALRM)
while self.worker.is_alive():
self.worker.terminate()
raise ProcessLockTimeout('Timeout to lock {}'.format(self.resource))
else:
try:
self.process_lock.acquire()
except Exception as e:
self.process_lock = None
logger.exception("Acquire lock failed..")
raise ProcessLockFailed('Unable to lock {}, err={}'.format(self.resource, e.message))
def release(self):
if self.process_lock:
if self.worker:
self.release_event.set()
self.worker.join()
self.worker = None
else:
self.process_lock.release()
self.process_lock = None
def __enter__(self):
self.acquire()
return self
def __exit__(self, type, value, traceback):
self.release()
class _ProcessLock:
FILE_LOCK_PATH = '/var/lock/ezs3'
def __init__(self, resource, lock_type, blocking):
self.resource = resource
self.lock_type = lock_type
self.blocking = blocking
self.handle = None
self.thread_lock = file_resource_thread_locks[self.resource]
def acquire(self):
self.thread_lock.acquire()
if not os.path.exists(_ProcessLock.FILE_LOCK_PATH):
os.mkdir(_ProcessLock.FILE_LOCK_PATH)
filename = os.path.join(_ProcessLock.FILE_LOCK_PATH, self.resource)
self.handle = open(filename, 'w')
lock_op = 0
if self.lock_type == LOCK_TYPE.EXCLUSIVE:
lock_op = fcntl.LOCK_EX
else:
lock_op = fcntl.LOCK_SH
if self.blocking:
fcntl.flock(self.handle, lock_op)
else:
fcntl.flock(self.handle, lock_op | fcntl.LOCK_NB)
def release(self):
if self.handle:
fcntl.flock(self.handle, fcntl.LOCK_UN)
self.handle.close()
self.handle = None
if self.thread_lock.locked():
self.thread_lock.release()
# NOTE: do not logging in ProcessLockWorker because logging in
# another process without syslog handler might corrupt the log.
class ProcessLockWorker(multiprocessing.Process):
def __init__(self, flock_result, release_event, process_lock):
multiprocessing.Process.__init__(self)
self.flock_result = flock_result
self.release_event = release_event
self.process_lock = process_lock
def run(self):
try:
self.process_lock.acquire()
self.flock_result.put([0, ''])
self.release_event.wait()
self.process_lock.release()
except Exception as e:
self.flock_result.put([-1, e.message])
internal_lock = {}
def get_internal_lock(lock_name):
global internal_lock
if lock_name not in internal_lock:
internal_lock[lock_name] = Lock()
return internal_lock[lock_name]
external_lock = {}
def get_external_lock(lock_name, rds=None, timeout=60):
global external_lock
if lock_name not in external_lock or external_lock[lock_name] is None:
try:
rl = SingleLock(lock_name)
except:
rl = None
logger.warn('Faile to create monitor lock')
external_lock[lock_name] = rl
return external_lock[lock_name]
def monlock(func):
@wraps(func)
def wrapper(*args, **kwargs):
lock_name = kwargs.get('_lock_name')
if not lock_name:
lock_name = func.__module__
else:
del kwargs['_lock_name']
ret = None
try:
# Honestly, we should only using rados lock here, but due to the memory leak issue of rados lock, we added a internal lock so one process can share one rados lock.
in_lock = get_internal_lock(lock_name)
ex_lock = get_external_lock(lock_name)
in_lock.acquire()
if ex_lock:
ex_lock.acquire(auto_renew=True)
ret = func(*args, **kwargs)
except Exception, ex:
logger.error('Running wrapped function {} error: {}'.format(str(func), str(ex)))
raise ex
finally:
if ex_lock:
ex_lock.release()
if in_lock:
in_lock.release()
return ret
return wrapper
class MonLockWithName(object):
def __init__(self, lock_name):
self.lock_name = lock_name
def __call__(self, func):
def wrapped_f(*args):
ret = None
try:
in_lock = get_internal_lock(self.lock_name)
ex_lock = get_external_lock(self.lock_name)
in_lock.acquire()
if ex_lock:
ex_lock.acquire(auto_renew=True)
ret = func(*args)
except Exception, ex:
logger.error('Running wrapped function {} error: {}'.format(str(func), str(ex)))
raise ex
finally:
if ex_lock:
ex_lock.release()
if in_lock:
in_lock.release()
return ret
return wrapped_f
class RWLock(object):
class _Switch(object):
def __init__(self):
self._count = 0
self._mutex = Lock()
def acquire(self, lock):
self._mutex.acquire()
self._count += 1
if self._count == 1:
lock.acquire()
self._mutex.release()
def release(self, lock):
self._mutex.acquire()
self._count -= 1
if self._count == 0:
lock.release()
self._mutex.release()
def __init__(self):
self._read_switch = self._Switch()
self._write_switch = self._Switch()
self._no_readers = Lock()
self._no_writers = Lock()
self._readers_queue = Lock()
def read_acquire(self):
self._readers_queue.acquire()
self._no_readers.acquire()
self._read_switch.acquire(self._no_writers)
self._no_readers.release()
self._readers_queue.release()
def read_release(self):
self._read_switch.release(self._no_writers)
def write_acquire(self):
self._write_switch.acquire(self._no_readers)
self._no_writers.acquire()
def write_release(self):
self._no_writers.release()
self._write_switch.release(self._no_readers)