概要
USB仮想シリアルポートデバイスの挿抜を監視するためのPythonソースコードを自分用にメモしておく。
実装
必要パッケージ
pip install PySerial pip install pywin32
ソースコード
import threading import time import logging import win32api import win32gui import win32gui_struct from serial.tools import list_ports # ロギング設定 logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") logger = logging.getLogger(__name__) # WM_DEVICECHANGE イベントタイプ WM_DEVICECHANGE = 0x0219 # デバイス変更イベント DBT_DEVICEARRIVAL = 0x8000 # デバイス挿入 DBT_DEVICEREMOVECOMPLETE = 0x8004 # デバイス取り外し DBT_DEVTYP_PORT = 0x00000003 # シリアルポートデバイス class USBSerialDeviceListener(threading.Thread): """ USBシリアル通信デバイスの抜き差しを監視する """ def __init__(self): super().__init__() self.ports = list_ports.comports() self.ports = {port.device for port in self.ports} self.ports_changed_callback = None # ユーザーコールバック関数 self.stopRequest = False self.WAIT_PERIOD = 0.1 def WndProc(self, hwnd, msg, wParam, lParam): """ウィンドウプロシージャ: OSメッセージを処理する""" logger.debug(f"WndProc msg: {msg}") if msg == WM_DEVICECHANGE: if wParam == DBT_DEVICEARRIVAL: # デバイス挿入時の処理 logger.debug("Device inserted.") self.handle_device_event(wParam, lParam) elif wParam == DBT_DEVICEREMOVECOMPLETE: # デバイス取り外し時の処理 logger.debug("Device removed.") self.handle_device_event(wParam, lParam) return win32gui.DefWindowProc(hwnd, msg, wParam, lParam) def handle_device_event(self, event_type, lParam): try: # デバイス情報構造体の解析 s = win32gui_struct.UnpackDEV_BROADCAST(lParam) logger.debug( f"handle_device_event device type: {s.devicetype}, event type: {event_type}" ) if s.devicetype == DBT_DEVTYP_PORT: ports = [port.device for port in list_ports.comports()] removedPorts = [] insertedPorts = [] for port in ports: if port not in self.ports: insertedPorts.append(port) for port in self.ports: if port not in ports: removedPorts.append(port) if self.ports_changed_callback: self.ports_changed_callback[0]( insertedPorts, removedPorts, self.ports_changed_callback[1] ) self.ports = ports except Exception as e: # 構造体の解析エラーなど logger.error(f"デバイス情報解析エラー: {e}") def run(self): """ ウインドウクラスの作成とメッセージループは同一スレッドで実行する必要がある """ # ウィンドウクラスの登録 wc = win32gui.WNDCLASS() wc.lpfnWndProc = self.WndProc wc.lpszClassName = "DeviceListenerClass" wc.hInstance = win32api.GetModuleHandle(None) try: class_atom = win32gui.RegisterClass(wc) except win32gui.error as e: # クラスが既に登録されている場合は無視 logger.info(f"Class already registered: {e}") if e.winerror != 1410: # ERROR_CLASS_ALREADY_EXISTS raise class_atom = win32gui.GetClassInfo(wc.hInstance, wc.lpszClassName)["atom"] # 非表示ウィンドウの作成 self.hwnd = win32gui.CreateWindow( class_atom, "Device Listener", 0, # スタイル (非表示) 0, 0, 0, 0, # 位置とサイズ (非表示) 0, # 親ウィンドウ 0, # メニュー wc.hInstance, None, ) logger.debug(f"Window handle: {self.hwnd}") # メッセージループ logger.info("USBデバイス挿抜監視開始") while True: win32gui.PumpWaitingMessages() time.sleep(self.WAIT_PERIOD) if self.stopRequest: break if hasattr(self, "hwnd") and self.hwnd: win32gui.DestroyWindow(self.hwnd) logger.info("USBデバイス挿抜監視終了") if __name__ == "__main__": """ 動作確認 """ def ports_changed_callback(insertedPorts, removedPorts, user_data): logger.info(f"insertedPorts: {insertedPorts}") logger.info(f"removedPorts: {removedPorts}") logger.info(f"user_data: {user_data}") listener = USBSerialDeviceListener() listener.ports_changed_callback = (ports_changed_callback, None) listener.start() try: while True: time.sleep(1) except KeyboardInterrupt: listener.stopRequest = True listener.join()
説明
- 非表示のウインドウを作成し、イベントを監視
- 他のプログラムからモジュールとして利用することを想定し、イベント発生のメッセージを待つwin32gui.PumpWaitingMessages()を繰り返し呼ぶためのループをスレッド動作させる
- win32gui.PumpMessages()をスレッドで動作させるには、ウインドウの作成も含めて同一スレッドで実施する必要があるため、そのように実装している