PyUSB是一个纯Python的USB访问模块,提供了对USB设备的底层访问能力。下面我将详细解析PyUSB的使用方法和工作原理。
# 安装PyUSB
pip install pyusb
# 需要安装后端库(以libusb为例)
# Ubuntu/Debian
sudo apt-get install libusb-1.0-0-dev
# macOS
brew install libusb
# Windows:下载libusb的DLL文件
import usb.core
import usb.util
import usb.backend
# PyUSB的主要组件:
# - usb.core: 核心功能模块
# - usb.util: 工具函数
# - usb.backend: 后端抽象层
# - usb.control: 控制传输相关
# USB通信的关键参数
VENDOR_ID = 0x1234 # 厂商ID
PRODUCT_ID = 0x5678 # 产品ID
# 端点类型
ENDPOINT_IN = 0x80 # 输入端点
ENDPOINT_OUT = 0x00 # 输出端点
# 传输类型
usb.util.CTRL_TYPE_STANDARD = 0x00
usb.util.CTRL_TYPE_CLASS = 0x20
usb.util.CTRL_TYPE_VENDOR = 0x40
usb.util.CTRL_TYPE_RESERVED = 0x60
import usb.core
import usb.util
def find_and_connect_device():
# 方法1:通过VID/PID查找设备
dev = usb.core.find(idVendor=0x1234, idProduct=0x5678)
if dev is None:
raise ValueError("设备未找到")
# 方法2:列出所有设备
devices = usb.core.find(find_all=True)
for device in devices:
print(f"发现设备: {hex(device.idVendor)}:{hex(device.idProduct)}")
# 重置设备
try:
dev.reset()
except usb.core.USBError as e:
print(f"重置失败: {e}")
# 设置配置
dev.set_configuration()
# 获取设备描述符
print(f"制造商: {dev.manufacturer}")
print(f"产品: {dev.product}")
print(f"序列号: {dev.serial_number}")
return dev
class USBDeviceController:
def __init__(self, vendor_id, product_id):
self.dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
if self.dev is None:
raise ValueError("设备未找到")
# 获取接口描述符
self.cfg = self.dev.get_active_configuration()
self.intf = self.cfg[(0,0)] # 第一个接口
# 查找端点
self.ep_out = None
self.ep_in = None
for endpoint in self.intf:
if usb.util.endpoint_direction(endpoint.bEndpointAddress) == \
usb.util.ENDPOINT_OUT:
self.ep_out = endpoint
elif usb.util.endpoint_direction(endpoint.bEndpointAddress) == \
usb.util.ENDPOINT_IN:
self.ep_in = endpoint
if self.ep_out is None or self.ep_in is None:
raise ValueError("端点未找到")
def send_data(self, data, timeout=1000):
"""发送数据到设备"""
# 确保数据是bytes类型
if isinstance(data, str):
data = data.encode('utf-8')
# 写入数据
written = self.ep_out.write(data, timeout)
print(f"已发送 {written} 字节")
return written
def receive_data(self, size=64, timeout=1000):
"""从设备接收数据"""
try:
data = self.ep_in.read(size, timeout)
return bytes(data)
except usb.core.USBError as e:
if e.errno == 110: # 超时错误
return None
raise
def control_transfer(self, bmRequestType, bRequest, wValue=0, wIndex=0,
data_or_wLength=None, timeout=1000):
"""执行控制传输"""
return self.dev.ctrl_transfer(
bmRequestType=bmRequestType,
bRequest=bRequest,
wValue=wValue,
wIndex=wIndex,
data_or_wLength=data_or_wLength,
timeout=timeout
)
class AdvancedUSBCommunicator:
def __init__(self, dev):
self.dev = dev
self.setup_endpoints()
def setup_endpoints(self):
"""自动配置端点"""
# 获取配置描述符
cfg = self.dev.get_active_configuration()
intf = cfg[(0,0)]
# 存储所有端点
self.endpoints = {}
for ep in intf:
addr = ep.bEndpointAddress
ep_type = ep.bmAttributes & 0x03
if ep_type == usb.util.ENDPOINT_TYPE_BULK:
self.endpoints[addr] = {
'type': 'BULK',
'ep': ep,
'direction': 'IN' if addr & 0x80 else 'OUT'
}
elif ep_type == usb.util.ENDPOINT_TYPE_INTERRUPT:
self.endpoints[addr] = {
'type': 'INTERRUPT',
'ep': ep,
'direction': 'IN' if addr & 0x80 else 'OUT'
}
def bulk_transfer(self, data, endpoint_addr, timeout=1000):
"""批量传输"""
if endpoint_addr not in self.endpoints:
raise ValueError(f"端点 {hex(endpoint_addr)} 不存在")
endpoint = self.endpoints[endpoint_addr]['ep']
if self.endpoints[endpoint_addr]['direction'] == 'OUT':
return endpoint.write(data, timeout)
else:
return endpoint.read(len(data), timeout)
def interrupt_transfer(self, data=None, endpoint_addr=None, timeout=1000):
"""中断传输"""
if endpoint_addr is None:
# 查找中断端点
for addr, info in self.endpoints.items():
if info['type'] == 'INTERRUPT':
endpoint_addr = addr
break
if endpoint_addr is None:
raise ValueError("未找到中断端点")
endpoint = self.endpoints[endpoint_addr]['ep']
if data is not None and self.endpoints[endpoint_addr]['direction'] == 'OUT':
return endpoint.write(data, timeout)
else:
return endpoint.read(endpoint.wMaxPacketSize, timeout)
import usb.core
import usb.util
class USBUtils:
@staticmethod
def list_all_devices():
"""列出所有USB设备"""
devices = usb.core.find(find_all=True)
device_list = []
for dev in devices:
try:
info = {
'vendor_id': hex(dev.idVendor),
'product_id': hex(dev.idProduct),
'manufacturer': dev.manufacturer,
'product': dev.product,
'serial_number': dev.serial_number,
'bus': dev.bus,
'address': dev.address
}
device_list.append(info)
except:
continue
return device_list
@staticmethod
def detach_kernel_driver(dev, interface=0):
"""分离内核驱动"""
if dev.is_kernel_driver_active(interface):
try:
dev.detach_kernel_driver(interface)
print(f"已分离接口 {interface} 的内核驱动")
return True
except usb.core.USBError as e:
print(f"分离驱动失败: {e}")
return False
return True
@staticmethod
def claim_interface(dev, interface=0):
"""声明接口"""
try:
usb.util.claim_interface(dev, interface)
return True
except usb.core.USBError as e:
print(f"声明接口失败: {e}")
return False
@staticmethod
def release_interface(dev, interface=0):
"""释放接口"""
try:
usb.util.release_interface(dev, interface)
return True
except usb.core.USBError as e:
print(f"释放接口失败: {e}")
return False
@staticmethod
def get_endpoint_info(dev):
"""获取端点详细信息"""
cfg = dev.get_active_configuration()
interface_number = cfg[(0,0)].bInterfaceNumber
print(f"接口: {interface_number}")
print(f"配置: {dev.get_active_configuration().bConfigurationValue}")
for interface in cfg:
for endpoint in interface:
print(f"\n端点地址: {hex(endpoint.bEndpointAddress)}")
print(f"端点属性: {hex(endpoint.bmAttributes)}")
print(f"最大包大小: {endpoint.wMaxPacketSize}")
print(f"轮询间隔: {endpoint.bInterval}")
import usb.core
import usb.util
import struct
import time
class HIDDeviceHandler:
# HID特定的请求类型
HID_GET_REPORT = 0x01
HID_SET_REPORT = 0x09
HID_REPORT_TYPE_INPUT = 0x01
HID_REPORT_TYPE_OUTPUT = 0x02
HID_REPORT_TYPE_FEATURE = 0x03
def __init__(self, vendor_id, product_id):
self.dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
if self.dev is None:
raise ValueError("HID设备未找到")
# 配置设备
self.configure_device()
def configure_device(self):
"""配置HID设备"""
# 分离内核驱动
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
# 设置配置
self.dev.set_configuration()
# 声明接口
usb.util.claim_interface(self.dev, 0)
def send_hid_report(self, report_id, data):
"""发送HID报告"""
# 构造报告数据
report_data = struct.pack('B', report_id) + data
# 发送SET_REPORT请求
self.dev.ctrl_transfer(
bmRequestType=0x21, # 主机到设备,类特定,接口
bRequest=self.HID_SET_REPORT,
wValue=(self.HID_REPORT_TYPE_OUTPUT << 8) | report_id,
wIndex=0,
data_or_wLength=report_data
)
def receive_hid_report(self, report_id, size):
"""接收HID报告"""
# 发送GET_REPORT请求
data = self.dev.ctrl_transfer(
bmRequestType=0xA1, # 设备到主机,类特定,接口
bRequest=self.HID_GET_REPORT,
wValue=(self.HID_REPORT_TYPE_INPUT << 8) | report_id,
wIndex=0,
data_or_wLength=size
)
return bytes(data)
def read_interrupt(self, endpoint, size=64, timeout=1000):
"""从中断端点读取数据"""
try:
data = self.dev.read(endpoint, size, timeout)
return bytes(data)
except usb.core.USBError as e:
print(f"读取错误: {e}")
return None
def cleanup(self):
"""清理资源"""
usb.util.release_interface(self.dev, 0)
self.dev.attach_kernel_driver(0)
# 使用示例
def hid_example():
# 示例:与Arduino HID设备通信
handler = HIDDeviceHandler(0x2341, 0x8036) # Arduino Leonardo
try:
# 发送数据
handler.send_hid_report(0, b'Hello Arduino!')
# 接收数据
time.sleep(0.1)
data = handler.receive_hid_report(0, 64)
print(f"收到数据: {data}")
# 中断传输
result = handler.read_interrupt(0x81, 64, 1000)
print(f"中断数据: {result}")
finally:
handler.cleanup()
import usb.core
import usb.util
class RobustUSBConnection:
def __init__(self, vendor_id, product_id, max_retries=3):
self.vendor_id = vendor_id
self.product_id = product_id
self.max_retries = max_retries
self.dev = None
self.connect()
def connect(self):
"""建立可靠连接"""
for attempt in range(self.max_retries):
try:
self.dev = usb.core.find(
idVendor=self.vendor_id,
idProduct=self.product_id
)
if self.dev is None:
raise ConnectionError("设备未找到")
# 重置设备
self.dev.reset()
# 设置配置
try:
self.dev.set_configuration()
except usb.core.USBError as e:
if e.errno == 16: # 资源忙错误
usb.util.dispose_resources(self.dev)
self.dev.set_configuration()
print("USB连接成功")
return
except usb.core.USBError as e:
print(f"连接尝试 {attempt + 1} 失败: {e}")
if attempt == self.max_retries - 1:
raise
time.sleep(1)
def safe_transfer(self, transfer_func, *args, **kwargs):
"""安全执行传输操作"""
for attempt in range(self.max_retries):
try:
return transfer_func(*args, **kwargs)
except usb.core.USBError as e:
print(f"传输失败 (尝试 {attempt + 1}): {e}")
# 特定错误处理
if e.errno == 5: # 输入/输出错误
self.reconnect()
elif e.errno == 110: # 超时
if attempt == self.max_retries - 1:
raise TimeoutError("操作超时")
else:
raise
time.sleep(0.5)
def reconnect(self):
"""重新连接设备"""
print("尝试重新连接...")
if self.dev:
usb.util.dispose_resources(self.dev)
self.connect()
PyUSB提供了强大的USB设备访问能力,关键要点:
设备发现:使用usb.core.find()查找设备
端点管理:理解IN/OUT端点,正确配置传输方向
传输类型:根据需求选择控制/批量/中断传输
资源管理:正确分离内核驱动,及时释放资源
错误处理:处理各种USB错误,实现健壮的重连机制
通过合理使用PyUSB,可以轻松实现与各种USB设备的通信,从简单的数据采集到复杂的设备控制都能胜任。记得在实际使用中根据具体设备文档调整参数和协议。