玩命加载中 . . .

pyserial 使用指导


概述

利用pyserial,可以访问串口,发送串口命令,当然,根据串口返回结果,判断命令执行正确性。

本文简介pyserial一些常用函数的使用方法与场景,并做一些阐述。

实践

初始化(连接串口)

串口连接示例

>>> import serial
>>> ser = serial.Serial('COM7', 115200, timeout=1)

不同平台连接不同类型串口,示例如下:

ser = serial.Serial("/dev/ttyUSB0", 9600, timeout=0.5) # 使用USB连接串行口
ser = serial.Serial("/dev/ttyAMA0", 9600, timeout=0.5) # 使用树莓派的GPIO口连接串行口
ser = serial.Serial("com1", 9600, timeout=0.5)# winsows系统使用com1口连接串行口
ser = serial.Serial("/dev/ttyS1", 9600, timeout=0.5)# Linux系统使用com1口连接串行口

连接串口时,除了主要的几个参数显示携带外,其他参数可以使用默认值,当然,可以携带全部参数,示例如下:

当然, 可以在连接串口时,携带上全部参数:

ser = serial.Serial(
port=None,              # number of device, numbering starts at
# zero. if everything fails, the user
# can specify a device string, note
# that this isn't portable anymore
# if no port is specified an unconfigured
# an closed serial port object is created
baudrate=9600,          # baud rate
bytesize=EIGHTBITS,     # number of databits
parity=PARITY_NONE,     # enable parity checking
stopbits=STOPBITS_ONE,  # number of stopbits
timeout=None,           # set a timeout value, None for waiting forever
xonxoff=0,              # enable software flow control
rtscts=0,               # enable RTS/CTS flow control
interCharTimeout=None   # Inter-character timeout, None to disable
)

对象属性

  • name —— 设备名字

  • port —— 读或者写端口,比如示例中的’COM7’,这里只接受字符串类型内容

  • baudrate —— 波特率,比如示例中的115200,波特率端口,默认9600

  • bytesize —— 字节大小

  • parity —— 校验位

  • stopbits —— 停止位,取值(1,2)

  • timeout —— 读超时设置

  • writeTimeout —— 写超时

  • xonxoff —— 软件流控

  • rtscts —— 硬件流控

  • dsrdtr —— 硬件流控

  • interCharTimeout —— 字符间隔超时

属性介绍

说明:

​ 如未显示携带,pyserial设有默认值。

name

>>> print ser.name
COM7

port

>>> print ser.port
COM7
>>> print ser.portstr
COM7
>>>

baudrate

>>> print ser.baudrate
115200
>>>

bytesize

>>> print ser.bytesize
8

parity

>>> print ser.parity
N

stopbits

>>> print ser.stopbits
1

timeout

>>> print ser.timeout
None

关于timeout:

timeout=None            # wait forever
timeout=0               # non-blocking mode (return immediately on read)
timeout=x               # set timeout to x seconds (float allowed)

writeTimeout

>>> print ser.writeTimeout
None

xonxoff

>>> print ser.xonxoff
False

rtscts

>>> print ser.rtscts
False

dsrdtr

>>> print ser.dsrdtr
False

interCharTimeout

>>> print ser.interCharTimeout
None
>>>

一个命令获取当前串口属性信息

>>> ser.getSettingsDict
<bound method Serial.getSettingsDict of Serial<id=0x2cc3358, open=True>(port='com7', baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=None, xonxoff=False, rtscts=False, dsrdtr=False)>
>>>

查看pyserial 属性信息

>>> import serial
>>> ser = serial.Serial('COM7', 115200)
>>> dir(ser)
['BAUDRATES', 'BYTESIZES', 'PARITIES', 'STOPBITS', '_GetCommModemStatus', '_SAVED_SETTINGS', '__abstractmethods__', '__class__', '__delattr__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__iter__', '__metaclass__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_abc_cache', '_abc_negative_cache', '_abc_negative_cache_version', '_abc_registry', '_cancel_overlapped_io', '_checkClosed', '_checkReadable', '_checkSeekable', '_checkWritable', '_close', '_reconfigure_port', '_update_break_state', '_update_dtr_state', '_update_rts_state', 'applySettingsDict', 'apply_settings', 'baudrate', 'break_condition', 'bytesize', 'cancel_read', 'cancel_write', 'cd', 'close', 'closed', 'cts', 'dsr', 'dsrdtr', 'dtr', 'exclusive', 'fileno', 'flush', 'flushInput', 'flushOutput', 'getCD', 'getCTS', 'getDSR', 'getRI', 'getSettingsDict', 'get_settings', 'inWaiting', 'in_waiting', 'interCharTimeout', 'inter_byte_timeout', 'iread_until', 'isOpen', 'isatty', 'next', 'open', 'out_waiting', 'parity', 'port', 'read', 'read_all', 'read_until', 'readable', 'readall', 'readinto', 'readline', 'readlines', 'reset_input_buffer', 'reset_output_buffer', 'ri', 'rs485_mode', 'rts', 'rtscts', 'seek', 'seekable', 'sendBreak', 'send_break', 'setDTR', 'setPort', 'setRTS', 'set_buffer_size', 'set_output_flow_control', 'stopbits', 'tell', 'timeout', 'truncate', 'writable', 'write', 'writeTimeout', 'write_timeout', 'writelines', 'xonxoff']
>>>

对象常用方法

  • ser.isOpen() —— 查看端口是否被打开

  • ser.open() —— 打开端口

  • ser.close() —— 关闭端口

  • ser.read() —— 从端口读字节数据。默认1个字节

  • ser.read_all() —— 从端口接收全部数据,默认4096字节

  • ser.write(“s”) —— 向端口写数据 “s”

  • ser.readline() —— 读一行数据

  • ser.readlines() —— 读多行数据

  • in_waiting —— 返回接收缓存中的字节数,方法同inWaiting()

  • flush() —— 等待所有数据写出

  • flushInput() —— 丢弃接收缓存中的所有数据

  • flushOutput() —— 终止当前写操作,并丢弃发送缓存中的数据

  • read_until(“s”) —— 读取串口数据,知道出现预期字符串"s"就终止读取

  • set_buffer_size(rx_size=value1, tx_size=value2) —— 设置buffer 大小,默认4096bytes,上限2147483647

  • sendBreak() —— send break condition

  • setRTS(level=1) —— set RTS line to specified logic level

  • setDTR(level=1) —— set DTR line to specified logic level

  • getCTS() —— return the state of the CTS line

  • getDSR() —— return the state of the DSR line

  • getRI() —— return the state of the RI line

  • getCD() —— return the state of the CD line

常用代码示例

说明:

​ 如下示例,借助虚拟串口软件(Virtual Serial Port Driver Pro)创建了两个虚拟串口(COM7 和 COM8),并将两个串口短接(即COM7与COM8互通,形成回环)

获取当前设备上所有串口信息

    @staticmethod
    def list_serial_ports():
        """
        Lists serial port names
        :raises EnvironmentError:   On unsupported or unknown platforms
        :returns: A list of the serial ports available on the system
        """
        if sys.platform.startswith('win'):
            ports = ['COM%s' % (i + 1) for i in range(256)]
        elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
            # this excludes your current terminal "/dev/tty"
            ports = glob.glob('/dev/tty[A-Za-z]*')
        elif sys.platform.startswith('darwin'):
            ports = glob.glob('/dev/tty.*')
        else:
            raise EnvironmentError('Unsupported platform')
    
        result = []
        for port in ports:
            try:
                s = serial.Serial(port)
                s.close()
                result.append(port)
            except (OSError, serial.SerialException):
                pass
    
        return result

字符串的发送接收

import serial
ser = serial.Serial('COM7', 115200)
writen = ser.write('Hello, i send a string command to serial')
print writen
recevied = ser.read(writen )
print recevied

其中,read(value)方法的参数value为需要读取的字符长度:

  • 如果指定了value的值,当且仅当串口 buffer内容长度小于value值时,read动作会一直等待,直到串口buffer >= value时,read结束。比如read(10),而buffer内容长度超过10字节,则只读取前面10个字节内容。

  • 如果想要全部读取,提供两个方法:

​ (1)inWaiting() 或者 in_waiting:监测接收字符。 inWaitting返回接收字符串的长度值,然后把这个值赋给read做参数。

​ (2)read_all() :读取全部字符

说明:

​ inWaiting()或者in_wating,使用如下:

>>> ser.inWaiting()
36L
>>> ser.in_waiting
36L
>>>

如果有非零值,说明串口buffer中有内容可读。

​ read_all() 使用如下:

>>> ser.read_all()
'I received what you send, thanks.'
>>>

还有另外一个方法readall(),但是没有理解掉这个方法,使用readall()读取数据时,无论设置多大的buffer size,无论发送多大的数据包过去,命令始终卡着,无任何输出,即使携带上回车换行也没有效果。

十六进制显示

十六进制显示的实质是把接收到的字符串转换成其对应的ASCII码,然后将ASCII码值再转换成十六进制数显示出来,这样就可以显示特殊字符了。

在这里定义了一个函数,如hexShow(argv),代码如下:

#!/usr/bin/env python
# -*- coding:UTF-8 -*

import serial


def hexShow(argv):
    result = ''
    hLen = len(argv)
    for i in xrange(hLen):
        hvol = ord(argv[i])
        hhex = '%02x'%hvol
        result += hhex+' '

    print 'hexShow:', result

ser = serial.Serial('COM8',9600)
str_input = raw_input('Enter some words:')
writen_str = ser.write(str_input)
print "--  writen_str : ({})".format(writen_str)

read_str = ser.read(writen_str)
print "--  read_str : ({})".format(read_str)

hexShow(read_str)

ser.close()

测试结果如下:

C:\Users\Administrator>python C:\Users\Administrator\Desktop\hexShow.py
Enter some words:hex shows test
--  writen_str : (14)
--  read_str : (hex shows test)
hexShow: 68 65 78 20 73 68 6f 77 73 20 74 65 73 74

十六进制发送

十六进制发送实质是发送十六进制格式的字符串,如’\xaa’,‘\x0b’。重点在于怎么样把一个字符串转换成十六进制的格式,有两个误区:

1)‘\x’+'aa’是不可以的,涉及到转义符反斜杠

2)‘\x’+‘aa’和r’\x’+‘aa’也不可以的,这样的打印结果虽然是\xaa,但赋给变量的值却是’\xaa’

这里用到decode函数:

>>> s ='aabbccddee'
>>> s.decode('hex')
'\xaa\xbb\xcc\xdd\xee'
>>>

需要注意一点,如果字符串 s 的长度为奇数,则decode会报错:

>>> s ='aabbccdde'
>>> s.decode('hex')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Python27\lib\encodings\hex_codec.py", line 42, in hex_decode
    output = binascii.a2b_hex(input)
TypeError: Odd-length string
>>>

可以按照实际情况,用字符串的切片操作,在字符串的开头或结尾加一个’0’。

假如在串口助手以十六进制发送字符串"abc",那么你在python中则这样操作:

serial.write(”\x61\x62\x63") 

当然,还有另外一个方法也可以达到相同目的:

s = "abc"
strHex = binascii.b2a_hex(s)
# print strHex
strhex = strHex.decode("hex")
# print strhex
ser.write(strhex);

读取串口记录的最后一行

    def read_last_line(self):
        """
        Read the last line of output of serial, if not match, return None
        """
        all_line = self.serial.read_all().decode('GBK')
        logging.debug("--  [DEBUG]  all_line : ({})".format(all_line))
        all_line_list = [x for x in all_line.split('\n') if x]
        if len(all_line_list):
            return all_line_list[-1]
        else:
            return

判断 buffer 是否有内容

    def has_buffer(self):
        """
        Get serial buffer
        @return: True or False
        """
        if self.serial.in_waiting > 0:
            buffer_size = self.serial.in_waiting
            time.sleep(5)
            current_buffer_size = self.serial.in_waiting
            if buffer_size != current_buffer_size:
                return True
        else:
            return False

读取串口内容并记录到文件

    def write_serial_output_to_file(self, file_name):
        """
        Write serial output into a file
        @param file_name: string, a file name which to record the serial output information
        """
        logging.debug('--  [DEBUG]  Write data into file : (%s)', file_name)

        while True:
            if self.serial.in_waiting > 0:  # If serial has data to read
                full_data = self.serial.readline().decode('GBK').strip()
                # split_data = full_data.splitlines()
                with open(file_name, 'a+') as f:
                    f.write(full_data)

读取串口内容直到匹配到预期字符串

    def read_until_matched(self, key_words):
        """
        Get some info when read from serial until match some key words
        @return:
        """
        logging.debug("--  Get key words : (%s) from serial cache", key_words)

        matched = self.serial.read_until(key_words).decode('GBK')

        return matched

文章作者: Gavin Wang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Gavin Wang !
  目录