LoginSignup
1
2

More than 3 years have passed since last update.

pywin32のソースコードリーディング

Last updated at Posted at 2019-10-13

pywin32についての情報があまりにないので、コードを読みました。折角なので、分かった事を共有します。

題材

簡単なプロセスロガーです。wmi Tutorial (Monitorning)のサンプルコードがベースになっています。

import wmi
c = wmi.WMI()
# Intrinsic event poller handlers
watchers = {
    'creation': c.Win32_Process.watch_for("creation"),
    'deletion': c.Win32_Process.watch_for("deletion"),
}
while True:
    for eventType, watcher in watchers.items():
        try:
            process = watcher(timeout_ms=10)
        except wmi.x_wmi_timed_out:
            continue
        caption = process.Caption # <= ここ
        print(f"{eventType}: {caption}")

最後から2行目のprocess.Captionを掘っていきます。

リーディング開始

WMI関数

まず、processはwatcher(timeout_ms=10)の戻り値で、watcherはさらにc.Win32_Process.watch_for("creation")とかなので、まずはc.Win32_Processを見ていきます。

c = wmi.WMI(privileges=["debug"])なので、とりあえずWMIの定義を探します。WMIのtypeを確認すると<class 'function'>なので、WMIはクラスではなく関数みたいです。

wmiの定義ファイルを探します。僕の場合(Windows10)は$HOME/AppData/Local/Programs/Python/Python37/Lib/site-packages/wmi.pyに有りました。まあ、そんなところでしょう。

WMIなどの主要なオブジェクトは、ファイルの始めか終わりにあるので、そこを中心に検索をかけて探しましょう。(ツール使えるようになりたい...) 1300行目あたりにWMI = connectと定義してあります。次にconnectを探すと1200行目あたりに有ります。主要な箇所を抜粋します。

def connect (
    computer="",
    impersonation_level="",
    authentication_level="",
    authority="",
    privileges="",
    moniker="",
    wmi=None,
    namespace="",
    suffix="",
    user="",
    password="",
    find_classes=False,
    debug=False
):
    ... ... ... 
                if user: ...
                else:
                    moniker = construct_moniker (
                        computer=computer,
                        impersonation_level=impersonation_level,
                        authentication_level=authentication_level,
                        authority=authority,
                        privileges=privileges,
                        namespace=namespace,
                        suffix=suffix
                    )
                    obj = GetObject (moniker)
                wmi_type = get_wmi_type (obj)
                if wmi_type == "namespace":
                    return _wmi_namespace (obj, find_classes)
                elif ...

次に、construct_moniker(1300行目あたり)の抜粋です。

def construct_moniker (
    computer=None,
    impersonation_level=None,
    authentication_level=None,
    authority=None
    privileges=None,
    namespace=None,
    suffix=None
):
    security = []
    ...
    if privileges: security.append ("(%s)" % ", ".join (privileges))

    moniker = [PROTOCOL]
    if security: moniker.append("{%s}!" % ", ".join (security))
    ....
    return "".join (moniker)

PROTOCOLはconnect関数の直前で、PROTOCOL = "winmgmts:"と定義されています。ここで、monikerとは大雑把に言えばDBで言うところの接続文字列です。construct_moniker関数を読むとmonikerの構造に対する理解が深まります。

今回の場合、construct_monikerにはprivileges=["debug"]しか与えていないので、戻り値はwinmgmts:{(debug)}!となります。debugの意味は、Windows Dev Center (Constructing a Moniker String)を参照してください。

さて、GetObjectを飛ばしています。これはファイルの冒頭で、from win32com.client import GetObjectとインポートされています。本当はGetObjectも追っかけると楽しいのですが、今回はスルーします。概略だけ説明すると、GetObject(moniker)でmonikerに対応するCOMのラッパオブジェクトが手に入ります(多分。。。)。GetObjectも詳しいドキュメントが見当たらなかったので、また気が向いたらトライします。

余談ですが、connect関数でprivilegesのデフォルト値は""となっています。ですが、construct_monikerを読むと、どうやら[]の方が良い気がします。

ここまでで、c = wmi.WMI(privileges=["debug"])c = _wmi_namespace (obj, False)のことだと分かりました。

_wmi_namespaceクラス

どんどん進みましょう。次はc.Win32_Process.watch_for("creation")のうち、c.Win32_Processについて見ていきます。_wmi_namespaceは910行目あたりで定義されています。_wmi_namespaceのメンバを探してもWin32_Processは無いので、__getattr__や__get_attribute__が無いか探します。すると、__getattr__が見つかります。

#
# class WMI
#
class _wmi_namespace:
    def __init__ (self, namespace, find_classes):
        _set (self, "_namespace", namespace)
        ...
        self._classes_map = {}
        ...

    def __getattr__ (self, attribute):
        try:
            return self._cached_classes (attribute)
        except ...

    def _cached_classes (self, class_name):
        if class_name not in self._class_map:
            self._classes_map[class_name] = _wmi_class (self, self._namespace.Get (class_name))
        return self._classes_map[class_name]

c = _wmi_namespace (obj, False)だったので、namespace=obj、find_classes=Falseだと分かります。最後から2行目で_wmi_classを作り、それを返しています。_wmi_classの第二引数はobj = GetObject ("winmgmts:{(debug)}!")だったので、GetObject ("winmgmts:{(debug)}!").Get ("Win32_Process")となります。変数名から推測するに、namespace内からWin32_Processというクラスを探し出すのでしょう。この場合のnamespaceはWindows Dev Center (Constructing a Moniker String)でいうdefault namespaceだと思います。

The namespace can be omitted from the object path, in which case the default namespace is assumed. (Windows Dev Centerより)

これでc.Win32_Process.watch_for("creation")のうち、c.Win32_Process = _wmi_class (c, class)が分かりました。ここで、_win_classの第一引数はc.Win32_Processのcと同一で、第二引数はWin32_Processのクラス(ラッパー)オブジェクトと思われるものです。

次に_wmi_classの定義を探します。これは770行目あたりで定義されています。

class _wmi_class (_wmi_object):
    def __init__ (self, namespace, wmi_class):
        _set (self, "_class_name", wmi_class.Path_.Class)
        if namespace:
            _set (self, "_namespace", namespace)
        else ...

    def watch_for (
        self,
        notification_type="operation",
        delay_specs=1,
        fields=[],
        **where_clause
    ):
        ...
        retrun self._namespace.watch_for (
            notification_type=notification_type,
            wmi_class=self,
            delay_secs=delay_secs,
            fields=fields,
            **where_clause
         )

すごい!!なんと、c.Win32_Process.watch_for("creation")c.watch_for (notification_type="creation", wmi_class=c.Win32_Process)の事でした。いやー、何処かでこのパターンを使ってみたいですね。

蛇足ですが、_class_nameに入れられる、wmi_class.Path_.ClassはWin32_Processなんでしょうね。これ、後で使います。

_wmi_namespaceクラス (watch_for)

では、再び_wmi_namespaceクラスに戻ります。

class _wmi_namespace:
    def watch_for (
        self,
        raw_wql=None,
        notification_type="operation",
        wmi_class=None,
        delay_secs=1,
        fields=[],
        **where_clause
    ):
        if isinstance (wmi_class, _wmi_class):
            class_name = wmi_class._class_name
        else: ...

        if raw_wql: ...
        else:
            fields = set (['TargetInstance'] + (fields or ["*"]))
            field_list = ", ".join (fields)
            if is_extrinsic: ...
            else:
                if where_clause:
                    where = " AND " + " AND ".join(["TargetInstance.%s = '%s'" % (k, v) for k, v in where_clause.items ()])
                else:
                    where = ""
                wql = \
                    "SELECT %s FROM __Instance%sEvent WITHIN %d WHERE TargetInstance ISA '%s' %s" % \
                    (field_list, notification_type, delay_secs, class_name, where)
        try:
            return _wmi_watcher (
                self._namespace.ExecNotificationQuery (wql),
                is_extrinsic=is_extrinsic,
                fields=fields
            )
        except ...

「あっ、fieldsってこう言う意味だったんだ!」とか、「where_clauseか、なるほどねぇ」とか、色々腑に落ちる場面です。

notification_typeはcreation、class_nameはWin32_Processになるので、wql = "SELECT TargetInstance, * FROM __InstancecreationEvent WITHIN 1 WHERE TargetInstance ISA 'Win32_Process' "となります。もう、ほとんどSQLです。結局、全てはこれを生成するためのビルダーでした。

それでは、次はExecNotificationQueryを調べよう、と行きたいのですが、思い出してください。self._namespaceはwin32conのGetObjectで生成したオブジェクトです。。。パスします、すみません。

何が返ってくるかは、_wmi_watcherの定義から推察しましょう。

_wmi_watcherクラス

1170行目あたりに定義があります。watchは、process = watcher(timeout_ms=10)と言う感じで呼ばれているので、callableです。callableなクラスには、きっと__call__メソッドが定義してあるので探します。

class _wmi_watcher:
    def __init__ (self, wmi_event, is_extrinsic, fields=[]):
        self.wmi_event = wmi_event
        self.is_extrinsic = is_extrinsic
        self.fields = fields

    def __call__ (self, timeout_ms=-1):
        try:
            event = self.wmi_event.NextEvent (timeout_ms)
            if self.is_extrinsic: ...
            else:
                return _wmi_event (
                    event.Properties_ ("TargetInstance").Value,
                    _wmi_object (event, property_map=self._event_property_map),
                    self.fields,
                )
        except: ...

一番大事そうなのはwmi_eventですが、これはお約束により中身はよく分かりません。つらいですね。頑張って類推しましょう。watcherの使い方をネットで調べると、watcherはpoll処理を行います。そして、poll処理の頻度を指定できるらしいです。(参考: wmi Tutorial)

Intrinsic events occur when you hook into a general event mechanism offered by the WMI system to poll other classes on your behalf. (wmi Tutorialより)

You have to specify the type of event (creation, deletion, modification or simply operation to catch any type) and give a polling frequency in whole seconds. (wmi Tutorialより)

おそらく、watch_forのdelay_secs引数がpoll処理の頻度に当たるのでしょう。そして、poll処理の頻度を渡して返ってきたものはpoll処理を行うオブジェクトと考えるのが自然です。ということは、wmi_eventはpoll処理を行うオブジェクトだー!!

event = self.wmi_event.NextEvent (timeout_ms)を考えてみましょう。self.wmi_event.NextEvent (timeout_ms)はイベントの発生を検知したら、その情報を返すメソッドでしょう。そうなると、event変数は発生したイベントの情報です。実際、javascriptのイベント処理を思い出すと、イベントオブジェクトは、イベントが発生したオブジェクトや、イベントの種類を保持しているのでした。コードを見る限り、event.Properties_ ("TargetInstance").Valueでイベントが発生したオブジェクト(この場合はプロセス)を取得しているようです。合ってるっぽい!!

timeout_msについては、wmi Tutorial (Watchers with timeouts)を参照してください。timeoutを用いて複数のイベントを待ち受けるこの手法は、今回のお題プログラムでも使われています。

_wmi_eventクラス

さて、一番の難関をくぐり抜けた(避けた?)ので、あとはひたすら作業です。_wmi_eventは750行目あたりで定義されています。

class _wmi_event (_wmi_object):
    def __init__ (self, event, event_info, fields=[]):
        _wmi_object.__init__ (self, event, fields=fields)
        ...

_wmi_eventクラスは_wmi_objectクラスにイベント情報を追加しただけのクラスです。なので、扱い方は_wmi_objectクラスと同じです。

eventにはイベント発生元のプロセス、event_infoには新たな_wmi_objectが渡されるのでした。その後、eventは_wmi_objectに委譲され、event_infoは_wmi_eventに保存されます。

ちなみに、_wmi_object.__init__ (self, ...)はsuper().__init__(...)とほとんど同じです。違いは、super()はMethod Resolution Orderを一つ遡ったクラスの__init__が呼ばれるのに対し、_wmi_objectを使うと、どの親クラスの__init__を呼ぶかを指定できます。ただ、今回の場合は違いはないです。

_wmi_objectクラス

最後のラストスパート!!460行目あたりに有ります。

class _wmi_object:
    """The heart of the WMI module: wraps the objects returned by COM ISWbemObject interface and provide readier access to their proerties and methods resulting in a more Pythonic interface.
    ...
    """

    def __init__ (self, ole_object, instance_of=None, fields=[], property_map={}):
        _set (self, "ole_object", ole_object)
        _set (self, "properties", {})
        _set (self, "methods", {})
        _set (self, "property_map", property_map)
        ...
        if fields:
            for field in fields:
                self.properties[field] = None
        else:
            for p in ole_object.Properties_:
                self.properties[p.Name] = None

        for m in ole_object.Methods_:
            self.methods[m.Name] = Name

    def _cached_properties (self, attribute):
        if self.properties[attribute] is None:
            self.properties[attribute] = _wmi_property (self.ole_object.Properties_ (attribute))
        return self.properties[attribute]

    def _cached_methods (self, attribute):
        if self.methods[attribute] is None:
            self.methods[attribute] = _wmi_method (self.ole_object.Methods_ (attribute))
        return self.methods[attribute]

    def __getattr__ (self, attribute):
        try:
            if attribute in self.properties:
                property = self._cached_properties (attribute)
                factory = self.property_map.get (attribute, self.property_map.get (property.type, lambda x: x))
                value = factory (property.value)
                if property.type.startswith ("ref:"): ...
                else:
                    return value
            elif attribute in self.methods:
                return self._cached_methods (attribute)
            else:
                return getattr (self.ole_object, attribute)
        except ...

wmiモジュールの親玉です。基本機能を提供しています。COMオブジェクトのラッパー(_wmi_object)のプロパティやメソッドへのアクセスが、__getattr__を経由して呼び出されています。

_chached_xxxの仕組みは_wmi_namespaceの_cached_classesでも出てきました。これも、いつか使ってみたいパターンです。

_wmi_propertyと_wmi_methodのリーディングは、疲れたので今回はパスします。それに、どうせole_objectはwin32conがらみのオブジェクトなので、深くは掘れません(<= お約束)。おそらくですが、Windows Dev Center (Win32_Process class)で定義されているオブジェクトへのプロキシでしょう。

まとめ

そういえば、この記事はprocess.Captionを追求することがテーマでした。これがどのように実行されるのか、振り返ります。前提として、processはwmi.WMI(priviledges=["debug"]).Win32_Process.watch_for("creation")(timeout_ms=10)とかの戻り値でした。

  1. wmi.WMI(privileges=["debug"])でWMIへの接続(_wmi_namespace)を確立します。
  2. Win32_ProcessでWin32_Processクラス(_wmi_class)を取得します。
  3. watch_for("creation")でpoll処理を行うオブジェクト(_wmi_watcher)を取得します。
  4. poll処理を行うオブジェクトをcallする事で、イベントの待受をします。
  5. イベントが発生するとイベント情報(_wmi_event)が返されます。それがprocessです。
  6. イベント情報は、イベント発生オブジェクト(_wmi_object)として扱うことができます。よって、processはcreationイベントが発生したプロセスを表します。
  7. process.Captionへのアクセスは、_win_objectの__getattr__を介して、WMIの低レベルオブジェクト(_wmi_property)に伝達され、値が取得されます。

このような流れで、process.Captionは取得されていました。

おしまい。

参考文献

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2