工作と競馬2

電子工作、プログラミング、木工といった工作の記録記事、競馬に関する考察記事を掲載するブログ

USB仮想シリアルポートデバイスの挿抜を監視するソースコードの自分用メモ

概要

USB仮想シリアルポートデバイスの挿抜を監視するためのPythonソースコードを自分用にメモしておく。


実装

必要パッケージ

pip install PySerial
pip install pywin32

ソースコード

import threading
import time
import logging

import win32api
import win32gui
import win32gui_struct
from serial.tools import list_ports

# ロギング設定
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)

# WM_DEVICECHANGE イベントタイプ
WM_DEVICECHANGE = 0x0219

# デバイス変更イベント
DBT_DEVICEARRIVAL = 0x8000  # デバイス挿入
DBT_DEVICEREMOVECOMPLETE = 0x8004  # デバイス取り外し
DBT_DEVTYP_PORT = 0x00000003  # シリアルポートデバイス


class USBSerialDeviceListener(threading.Thread):
    """
    USBシリアル通信デバイスの抜き差しを監視する
    """

    def __init__(self):
        super().__init__()

        self.ports = list_ports.comports()
        self.ports = {port.device for port in self.ports}
        self.ports_changed_callback = None  # ユーザーコールバック関数
        self.stopRequest = False
        self.WAIT_PERIOD = 0.1

    def WndProc(self, hwnd, msg, wParam, lParam):
        """ウィンドウプロシージャ: OSメッセージを処理する"""
        logger.debug(f"WndProc msg: {msg}")
        if msg == WM_DEVICECHANGE:
            if wParam == DBT_DEVICEARRIVAL:
                # デバイス挿入時の処理
                logger.debug("Device inserted.")
                self.handle_device_event(wParam, lParam)
            elif wParam == DBT_DEVICEREMOVECOMPLETE:
                # デバイス取り外し時の処理
                logger.debug("Device removed.")
                self.handle_device_event(wParam, lParam)

        return win32gui.DefWindowProc(hwnd, msg, wParam, lParam)

    def handle_device_event(self, event_type, lParam):
        try:
            # デバイス情報構造体の解析
            s = win32gui_struct.UnpackDEV_BROADCAST(lParam)
            logger.debug(
                f"handle_device_event device type: {s.devicetype}, event type: {event_type}"
            )
            if s.devicetype == DBT_DEVTYP_PORT:
                ports = [port.device for port in list_ports.comports()]
                removedPorts = []
                insertedPorts = []
                for port in ports:
                    if port not in self.ports:
                        insertedPorts.append(port)
                for port in self.ports:
                    if port not in ports:
                        removedPorts.append(port)
                if self.ports_changed_callback:
                    self.ports_changed_callback[0](
                        insertedPorts, removedPorts, self.ports_changed_callback[1]
                    )
                self.ports = ports

        except Exception as e:
            # 構造体の解析エラーなど
            logger.error(f"デバイス情報解析エラー: {e}")

    def run(self):
        """
        ウインドウクラスの作成とメッセージループは同一スレッドで実行する必要がある
        """

        # ウィンドウクラスの登録
        wc = win32gui.WNDCLASS()
        wc.lpfnWndProc = self.WndProc
        wc.lpszClassName = "DeviceListenerClass"
        wc.hInstance = win32api.GetModuleHandle(None)
        try:
            class_atom = win32gui.RegisterClass(wc)
        except win32gui.error as e:
            # クラスが既に登録されている場合は無視
            logger.info(f"Class already registered: {e}")
            if e.winerror != 1410:  # ERROR_CLASS_ALREADY_EXISTS
                raise
            class_atom = win32gui.GetClassInfo(wc.hInstance, wc.lpszClassName)["atom"]
        # 非表示ウィンドウの作成
        self.hwnd = win32gui.CreateWindow(
            class_atom,
            "Device Listener",
            0,  # スタイル (非表示)
            0,
            0,
            0,
            0,  # 位置とサイズ (非表示)
            0,  # 親ウィンドウ
            0,  # メニュー
            wc.hInstance,
            None,
        )
        logger.debug(f"Window handle: {self.hwnd}")

        # メッセージループ
        logger.info("USBデバイス挿抜監視開始")
        while True:
            win32gui.PumpWaitingMessages()
            time.sleep(self.WAIT_PERIOD)
            if self.stopRequest:
                break
        if hasattr(self, "hwnd") and self.hwnd:
            win32gui.DestroyWindow(self.hwnd)
        logger.info("USBデバイス挿抜監視終了")


if __name__ == "__main__":
    """
    動作確認
    """

    def ports_changed_callback(insertedPorts, removedPorts, user_data):
        logger.info(f"insertedPorts: {insertedPorts}")
        logger.info(f"removedPorts: {removedPorts}")
        logger.info(f"user_data: {user_data}")

    listener = USBSerialDeviceListener()
    listener.ports_changed_callback = (ports_changed_callback, None)
    listener.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        listener.stopRequest = True
        listener.join()

説明

  • 非表示のウインドウを作成し、イベントを監視
  • 他のプログラムからモジュールとして利用することを想定し、イベント発生のメッセージを待つwin32gui.PumpWaitingMessages()を繰り返し呼ぶためのループをスレッド動作させる
  • win32gui.PumpMessages()をスレッドで動作させるには、ウインドウの作成も含めて同一スレッドで実施する必要があるため、そのように実装している