给nose写一个类似Robot Framework的Wait Until Keyword Succeeds



  • 不确定要等待多久,等待久了,感觉是浪费时间,尤其用例很多的情况下,严重影响了用例的整体执行时间;等待短了,又没有成功的apply下去,或者apply下去了但还没有具体生效

鉴于此,就需要有一个类似于Robot Framework的Wait Until Keyword Succeeds 功能.


#!/usr/bin/env python
# -*- coding:UTF-8 -*-

from __future__ import unicode_literals

import time

from common.config import GetConfig as config

retry_interval = config.retry_interval
retry_timeout = config.retry_timeout

def assert_check(func):
    :param func, string, a function name
    def inner(param, *args, **kwargs):
        :param param, string, a parameter
        for i in xrange(int(retry_timeout)):
                func(param, *args, **kwargs)
            except Exception as ex:
                logging.warn("[WARN]  Not match, %s time(s) to retry, "
                             "exception is : (%s) : (%s)", (i + 1), ex.__str__, unicode(ex))
                logging.warn("[WARN]  In file (%s) of function (%s), "
                             "at line (%s)", func.func_code.co_filename,
            logging.info("[ERROR]  Retry timeout or AssertionError")
            # raise AssertionError('[RetryTimeOut] Failure expected, retry timeout')
            # Deliberately doing this, this will mark the use case status as failed,
            # otherwise it will affect the normal output of the html report (the entire html content is empty)
            eq_(1, 2, "[RetryTimeOut] Failure expected, retry timeout")

    return inner

这里是一个示例的片段,通过装饰器,在调用check函数之前,引用这个装饰器,从而达到间隔一定时间(retry_timeout)、执行一定次数(retry_interval)来实现类似RFWait Until Keyword Succeeds 的功能


    def check_snapshot(self, gateway_group, target_id, iscsi_id, snap_name, op_type):
        Check snapshot created or delete result
        :param gateway_group, string, a gateway group name
        :param target_id, string, a target name
        :param iscsi_id, string, a volume name
        :param snap_name, string, a snapshot name for a iSCSI volume
        :param op_type, string, del or add

        if op_type == 'add':
            logging.info("[Check]    Check snapshot created result")
        elif op_type == 'del':
            logging.info("[Check]    Check snapshot deleted result")

        info = get_volume_info(gateway_group, target_id, iscsi_id, translate_pool_id=True)
        pool = info['pool']
        image_name = info['rbd_img']

        rbd_res = do_cmd("rbd -p {} snap ls {}".format(pool, image_name), 30, True).strip()
        if op_type == 'add':
            assert_in(snap_name, rbd_res, "[ERROR]  Not found : ({}) in "
                                          "rbd_res : ({})".format(snap_name, rbd_res))
            logging.info("[Success]  Create snapshot : (%s) success", snap_name)
        elif op_type == 'del':
            assert_not_in(snap_name, rbd_res, "[ERROR]  Still found : ({}) in "
                                              "rbd_res : ({})".format(snap_name, rbd_res))
            logging.info("[Success]  Delete snapshot : (%s) success", snap_name)

