玩命加载中 . . .

ceph PG 修复


概述

3月份的时候,江苏电力这个客户发生PG状态异常(纯粹是友情支持,发生问题的环境非我司ceph集群,而是ceph 开源集群),需要进行手工修复。

应客户要求,临时写了个修复脚本,供客户设置定时任务,定期自动修复。

考虑到一些PG状态是无法修复的,做了一些保护;以及修复失败的保护。

实践

inconsistent_pg_repair.py

脚本内容如下:

import os
import sys
import gzip
import json
import time
import errno
import select
import atexit
import socket
import logging
import commands
import traceback
import logging.handlers

from threading import Event
from fcntl import flock, LOCK_EX, LOCK_NB
from signal import signal, SIGTERM, SIGKILL, SIGUSR1


class Daemon(object):
    """
    A generic daemon class.

    Usage: subclass the Daemon class and override the run() method
    """
    def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null', kill_timeout=90):
        """
        Constructor.

        @param  pidfile:    path of the pid file
        @type   pidfile:    string
        @param  stdin:      path of the stdin
        @type   stdin:      string
        @param  stdout:     path of the stdout
        @type   stdout:     string
        @param  stderr:     path of the stderr
        @type   stderr:     string
        """
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.pidfile = pidfile
        self.lock_fd = -1
        self._kill_timeout = kill_timeout
        self._daemon_stopped = Event()
        self.first_loop = True

    def daemonize(self):
        """
        do the UNIX double-fork magic, see Stevens' "Advanced
        Programming in the UNIX Environment" for details (ISBN 0201563177)
        http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
        """
        try:
            pid = os.fork()
            if pid > 0:
                # exit first parent
                sys.exit(0)
        except OSError as e:
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        # decouple from parent environment
        os.chdir("/")
        os.setsid()
        os.umask(0)

        # do second fork
        try:
            pid = os.fork()
            if pid > 0:
                # exit from second parent
                sys.exit(0)
        except OSError as e:
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        # redirect standard file descriptors
        sys.stdout.flush()
        sys.stderr.flush()
        si = open(self.stdin, 'r')
        so = open(self.stdout, 'a+')
        se = open(self.stderr, 'a+b', 0)
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())

        # write pidfile
        atexit.register(self.delpid)
        pid = str(os.getpid())
        open(self.pidfile,'w+').write("%s\n" % pid)

    def delpid(self):
        """
        Delete the pid file.
        """
        os.remove(self.pidfile)

    def lock_or_exit(self):
        try:
            self.lock_fd = os.open(self.pidfile, 
                                   os.O_CREAT|os.O_TRUNC|os.O_WRONLY|os.O_EXCL,
                                   0o666)
        except OSError as e:
            if e.errno == errno.EEXIST:
                self.lock_fd = os.open(self.pidfile,os.O_WRONLY)
            else:
                sys.stderr.write(
                    u"Can not create pidfile : {} error {} \n".format(
                        self.pidfile, str(e)
                    )
                )
                sys.exit(1)

        try:
            # flock can be  inherited by child process, even exec()
            flock(self.lock_fd, LOCK_EX | LOCK_NB)
        except IOError as e:
            if e.errno == errno.EAGAIN:
                sys.stderr.write(
                    u"pidfile {} is locked, Daemon already running?\n".format(
                        self.pidfile
                    )
                )
                sys.exit(0)
            else:
                sys.stderr.write(
                    u"failed to lock the pid file {} , error {}".format(
                        self.pidfile, str(e)
                    )
                )
                sys.exit(1)

    def start(self):
        """
        Start the daemon
        """

        self.lock_or_exit()

        # Start the daemon
        self.prepare_start()
        self.daemonize()
        signal(SIGTERM, self.handle_term)
        signal(SIGUSR1, self.handle_siguser1)
        daemon_name = self.__class__.__name__
        try:
            logger.info('Daemon {} start'.format(daemon_name))
            self.run()
            logger.info('Daemon {} stopped'.format(daemon_name))
        except Exception:
            logger.exception('Daemon {} terminates unexpectedly'.format(daemon_name))

    def get_pid(self):
        # Get the pid from the pidfile
        try:
            pid_str = (open(self.pidfile).read().strip())
            if len(pid_str):
                pid = int(pid_str)
            else:
                pid = None
        except IOError:
            pid = None
        return pid

    def stop(self):
        """
        Stop the daemon
        """
        pid = self.get_pid()

        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return  # not an error in a restart

        # Try killing the daemon process
        try:
            waiting_killed = 0
            while 1:
                if waiting_killed == 0:
                    os.kill(pid, SIGTERM)
                elif waiting_killed < self._kill_timeout:
                    os.kill(pid, 0)
                else:
                    # Use kill pg to force kill all subprocess
                    os.killpg(os.getpgid(pid), SIGKILL)
                time.sleep(1)
                waiting_killed = waiting_killed + 1
        except OSError as err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print(str(err))
                sys.exit(1)

    def restart(self):
        """
        Restart the daemon
        """
        self.stop()
        self.start()

    def prepare_start(self):
        """
        You should override this method when you subclass Daemon. It will be called before the process has been
        daemonized by start() or restart().
        """

    def run(self):
        """
        You should override this method when you subclass Daemon. It will be called after the process has been
        daemonized by start() or restart().
        """

    def terminate(self):
        """
        You should override this method when you subclass Daemon. It will be called after the process has been
        daemonized by stop() or restart().
        """
    
    def user_signal_1(self):
        """
        You may override this method when you subclass Daemon.
        The default method toogle to change the loglevel from between debug and info
        """

        new_level = logging.INFO if EZLog.getLevel() == logging.DEBUG else logging.DEBUG
        EZLog.setLevel(new_level)
        logger.info('Daemon {} log level is chaged to {}'.format(
            self.__class__.__name__,
            'debug ' if new_level == logging.DEBUG else 'info'))

    def is_daemon_running(self, wait=0):
        # do not wait for first loop
        if self.first_loop and wait > 0:
            wait = 0
            self.first_loop = False
        return not self._daemon_stopped.wait(wait)

    def handle_term(self, sig, frm):
        logger.info('Got signal: {}'.format(sig))
        if self.is_daemon_running():
            self._daemon_stopped.set()
            self.terminate()

    def handle_siguser1(self, sig, frm):
        logger.info('Got signal: {}'.format(sig))
        self.user_signal_1()


class BtLogger():

    def __init__(self, log_name, log_file_name, log_level=logging.DEBUG, max_bytes=5*1024*1024, backup_count=5):

        self.logger = logging.getLogger(log_name)
        self.logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(asctime)s %(filename)-8s[line:%(lineno)-4s] [%(levelname)-5s] %(message)s')

        self.h_rotating_file = logging.handlers.RotatingFileHandler(log_file_name, maxBytes=max_bytes, backupCount=backup_count)
        self.h_rotating_file.setFormatter(formatter)
        self.h_rotating_file.rotator = self._rotator
        self.h_rotating_file.namer = self._namer
        self.h_stream = logging.StreamHandler(sys.stdout)
        self.h_stream.setFormatter(formatter)

        self.h_rotating_file.setLevel(log_level)
        self.h_stream.setLevel(logging.INFO)
        self.logger.addHandler(self.h_rotating_file)
        self.logger.addHandler(self.h_stream)

    def _namer(self, name):
        return name + ".gz"

    def _rotator(self, source, dest):
        with open(source, "rb") as sf:
            data = sf.read()
            with gzip.open(dest, "wb") as gf:
                gf.write(data)
        os.remove(source)

    def get_logger(self):
        return self.logger


sleep_time = 1 * 60 * 1
logger = BtLogger(__name__, "/var/log/inconsistent_pg_repair.log", log_level=logging.DEBUG).get_logger()


class RepairDaemon(Daemon):
    """ Repari inconsistent PG in daemon  """

    @staticmethod
    def pg_not_health():
        unhealth_pg = ["scrubbing", "nearfull", "incomplete", "full", "backfill",
                       "degraded", "remapped", "stale", "recovering"]
        pg_stat = commands.getoutput('ceph pg stat').strip()
        for each_unhealth_pg in unhealth_pg:
            if each_unhealth_pg in pg_stat:
                logger.error("[ERROR]  Find unhealth PG : (%s), do nothing, exit!!!", each_unhealth_pg)
                sys.exit(2)

    @staticmethod
    def get_inconsistent_pg():
        inconsistent_pg = []
        res = commands.getoutput("/usr/bin/ceph --connect-timeout=10 health detail | "
                                 "grep inconsistent | grep acting").strip()

        if res:
            if 'failed_repair' in res:
                logger.error("[ERROR]  Find 'failed_repair' PG, do nothing, exit!!!")
                sys.exit(2)

            for each_element in res.split('\n'):
                inconsistent_pg.append(each_element.strip().split()[1])

            # Remove duplicate and None element
            inconsistent_pg = {}.fromkeys(inconsistent_pg).keys()
            inconsistent_pg = filter(None, inconsistent_pg)
            logger.debug("Find inconsistent PG : (%s)", inconsistent_pg)
            return inconsistent_pg
        else:
            logger.debug("Not find inconsistent PG.")
            return inconsistent_pg

    def auto_repair(self, pg_name):
        """
        Run 'ceph pg reapir pg_name', try to repair PG
        :param pg_name, string, a PG id
        """
        repair_cmd = "ceph pg repair {}".format(pg_name)
        logger.info("Starts to repair PG : (%s), command : (%s)", pg_name, repair_cmd)
        repair_res = commands.getoutput(repair_cmd)

        health_res = commands.getoutput('/usr/bin/ceph --connect-timeout=10 -s').strip()
        if 'failed_repair' in health_res:
            logger.error("[ERROR]  Repair PG :(%s) failed, please pay more attention!!!!!")
            sys.exit(1)
        else:
            logger.info("Repair PG : (%s) result : (%s), now sleep (%s)", pg_name, repair_res, sleep_time)
            time.sleep(sleep_time) 

    def list_inconsistent_obj_repair(self):
        """
        List inconsistent object for each inconsistent PG before, then repair
        """
        self.pg_not_health()

        inconsistent_pg = self.get_inconsistent_pg()
        if len(inconsistent_pg) > 3:
            logger.warn("[WARN]  Found more than 3 abnormal PGs, suggest to repair manually, exits!!!")
            sys.exit(3)

        if len(inconsistent_pg):
            for each_pg in inconsistent_pg:
                list_cmd = "rados list-inconsistent-obj {} --format=json-pretty".format(each_pg)
                list_res = commands.getoutput(list_cmd)
                logger.debug("rados list-inconsistent-obj %s : (%s)", each_pg, list_res)
                if list_res and len(json.loads(list_res)['inconsistents']):
                    logging.warn("Find inconsistent PG : (%), now reapir "
                                 "(%s)",json.loads(list_res)['inconsistents'], each_pg)
                    self.auto_repair(each_pg)
                else:
                    logging.warn("PG in inconsistent, but list inconsistent object is None, skip to repair!!!")
        else:
            logger.info("Not find inconsistent PG, skip.")

    def run(self):
        logger.info("Repair inconsistent PG Daemon starts run")

        while self.is_daemon_running(wait=30):
            try:
                self.list_inconsistent_obj_repair()
            except Exception:
                logger.error("ezsnapsched exception: {}".format(traceback.format_exc()))


if __name__ == '__main__':
    daemon = RepairDaemon('/var/run/bt-repair.pid')
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)


文章作者: Gavin Wang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Gavin Wang !
  目录