3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Haskellの簡単なJupyter用kernelを作って機能アップしたら結構使えるようになった

Last updated at Posted at 2025-05-18

概要

前回作成した改良版に以下の3つの機能を追加します。

  • Cellの内容をrunghcに渡して実行させてその結果を表示する
  • Cellの内容をファイルに書いて、それを :loadする機能
  • コードの補完機能

このように機能アップしたら、Haskellの学習がすごく進みました。

今まで作成&解説した以下の2つの記事の内容を理解している前提で書きます。

実行環境

> sw_vers
ProductName:		macOS
ProductVersion:		15.4.1
BuildVersion:		24E263

> ghci --version
The Glorious Glasgow Haskell Compilation System, version 9.12.2

> python --version
Python 3.12.7

> jupyter --version
Selected Jupyter core packages...
IPython          : 8.27.0
ipykernel        : 6.28.0
ipywidgets       : 7.8.1
jupyter_client   : 8.6.0
jupyter_core     : 5.7.2
jupyter_server   : 2.14.1
jupyterlab       : 4.2.5
nbclient         : 0.8.0
nbconvert        : 7.16.4
nbformat         : 5.10.4
notebook         : 7.2.2
qtconsole        : 5.5.1
traitlets        : 5.14.3

機能アップした内容

機能アップしたテスト画像から見たほうが良いかのもしれません。

Cell Magicのような機能

PythonでCellの先頭行に書いた%%htmlなどの Cell Magicのような機能を追加した。

runghc

Cellの先頭行に次の文字列があるときにCellの内容をrunghcに処理させて結果を表示する。
具体的にはCellの内容をfilename.hsあるいは指定がないときは _temp_runghc.hsに書いて、runghc filename.hsとして実行させます。_temp_runghc.hsは処理終了後に削除します。

---runghc filename
Haskell Code
...

runghcf

既存のファイルfilename.hsrunghc filename.hsとして実行させます。

---runghcf filename

:load

Cellの先頭行に次の文字列があるときにCellの内容(Haskell Code1)をfilename.hs あるいは指定がないときは _temp_load.hsに書いて、ghci上で:load filename.hsをして実行し、Haskell Code2があれば:load後に実行します。_temp_load.hsの場合は処理終了後に削除します。

---load  filename
Haskell Code1
...
---ghci
Haskell Code2
...

コードの補完機能

ghciで出来る補完に自分で補完するコードを加えたものの実装。
ghciでの補完は:complete replを使った。

インストール

kernel情報のインストール

ファイル構成

ghci/
└─ kernel.json 

"/Users/xxxx/yyyy" はkernelソースのghci_kernel.pyghci_completions.pyを配置するディレクトリ名で、元々パスが通っていればenvは無くてもOK。

kernel.json
{
    "argv": ["python", "-m",
             "ghci_kernel", "-f",
             "{connection_file}"],
    "display_name": "GHCi",
    "language": "haskell",
    "env": {
        "PYTHONPATH": "/Users/xxxx/yyyy"
    }
}

インストール (ghciはディレクトリ名)

> jupyter kernelspec install ghci --user
[InstallKernelSpec] Installed kernelspec ghci in /Users/xxxx/Library/Jupyter/kernels/ghci

次に、これから説明するghci_kernel.pyghci_completions.py"PYTHONPATH"のディレクトリに配置してください。以上でインストールは終わりです。

ソースコード

ghci_kernel.py
import os
import re
import signal
import pexpect
import subprocess
import shlex
from ipykernel.kernelbase import Kernel
from pexpect.replwrap import REPLWrapper
from ghci_completions import CODE_COMPLETIONS

INIT_CMD = """
:set +m
"""
class GHCi:
    def __init__(self):
        self.start_ghci()
        self.reg1st = re.compile(r".*\n")
        self.regex = re.compile(r"^---(\w+)", re.MULTILINE)

    def start_ghci(self):
        self.replwrapper = REPLWrapper("ghci", "ghci> ", None,
                                       continuation_prompt="ghci| ")
        self.child = self.replwrapper.child
        self.run_command(INIT_CMD)
        
    # re_groupsは使用しない他のclassの引数と合わせるため    
    def run(self, code, re_groups=None):
        if not code.endswith("\n"):
            code += "\n"
        return self.run_command(code)
    
    def run_command(self, code, timeout=None):
        try:
            stderr = None
            stdout = self.replwrapper.run_command(code, timeout=timeout)
        except BaseException as e:
            stdout, stderr = self.interrupt(e)
        return stdout, stderr
    
    def interrupt(self, e):
        stderr = "exception: {}".format(type(e).__name__)
        # GHCiが起動中かの確認
        if self.child.isalive():
            # GHCiにSIGIN(CTRL+C)を送る
            self.child.sendintr()
            # GHCiがSIGINTを受けてプロンプトを表示するまでのデータを受け取る
            self.replwrapper._expect_prompt()
            # 受け取った内容をstdoutに代入
            stdout = self.child.before
        else:
            # GHCiが起動していないので再起動
            self.start_ghci()
            stdout = "Restarted GHCi"  
        return stdout, stderr
    
    def load_file(self, code, f_name):
        # codeの1行目全体をマッチさせる
        res_re = self.reg1st.match(code)
        if res_re == None: return "", "001:program error"
        # codeの1行目の取り除く
        code2 = code[res_re.span()[1]:]
        # ---ghciがあるか検索
        res_re = self.regex.search(code2)
        code3 = ""
        if res_re != None:
            # ---ghciが入力された場合
            if res_re.groups()[0] == "ghci":
                code3 = code2[res_re.span()[1]:]
                code2 = code2[0:res_re.span()[0]]
        if f_name in ["", None]:
            file_name = "_temp_load.hs"
        else:
            file_name = "{}.hs".format(f_name)
        with open(file_name, "w") as f:
            f.write(code2)
        outs = self.run(":load {}\n{}".format(file_name, code3))
        if f_name in ["", None]:
            if os.path.isfile(file_name):
                os.remove(file_name)
        return outs
    
    def shutdown(self, restart):
        try:
            self.replwrapper.run_command(":quit", timeout=0.5)
        except pexpect.EOF:
            return
        except BaseException:
            pass
        if self.child.isalive():
            # GHCiが終了していないときは強制終了
            self.child.kill(signal.SIGKILL)

class RunGHC:
    def communicate(self):
        # runghcが出力した内容(stdout,stderr)を受け取る
        stdoutb, stderrb = self.subp.communicate()
        return stdoutb.decode(), stderrb.decode()

    def run(self, code, f_name):
         # ソースをPIPEにするとMacでは/var/foldersにソースの一時ファイルが作成されて残ってしまう
        if f_name in ["", None]:
            file_name = "_temp_runghc.hs"
        else:
            file_name = "{}.hs".format(f_name)
        with open(file_name, "w") as f:
            f.write(code)
        outs =  self.runf(code, file_name)
        if f_name in ["", None]:
            if os.path.isfile(file_name):
                os.remove(file_name)

        return outs
    
    def runf(self, code, file_name):
        try:
            self.subp = subprocess.Popen(["runghc", file_name],
                        stdout = subprocess.PIPE,
                        stderr = subprocess.PIPE)
            outs = self.communicate()
        except BaseException as e:
            # 強制終了させる
            self.subp.kill()
            outs = self.communicate()
            except_name = "exception: {}".format(type(e).__name__)  
            if outs[1] == "":
                outs[1] = except_name
            else:
                outs[1] += ("\n" + except_name)
        # もし runghcが起動したままなら強制終了させる
        if self.subp.poll() == None:
            self.subp.kill()
            self.communicate()

        return outs

class Complete:
    def __init__(self, ghci):
        self.ghci = ghci

    def ret_val(self):
        # self.typesは必須ではない。そのときは "metadata":{}とする
        return  {"matches": self.matches, "start_pos": self.pos,
                "metadata": {'_jupyter_types_experimental': self.types}
                }
    
    def user_complete(self, word):
        # \.{0}はabc.xでタブキーが押されたらキーワードをxとする
        regex = re.compile(r"^{0}|\.{0}".format(word))
        for data in CODE_COMPLETIONS:
            key = data["key"]
            if regex.search(key) == None:  continue
            item = data["item"]
            itemtype = data["type"]
            self.matches += item
            self.types += [{"type": itemtype}]*len(item)

    def ghci_complete(self, word):
        stdout, stderr = self.ghci.run_command(
            ':complete repl 20 "{}"'.format(word), timeout=2)
        if stderr != None:
            return  self.ret_val()
        lines = stdout.splitlines()
        # linesの1行目を空白で分割する 
        top = shlex.split(lines[0])
        # :complete repl で指定した数と実際対象となった数の差を計算
        more = eval("{}-{}".format(top[1], top[0]))
        # 対象ワードは2行目から
        for line in lines[1:]:
            data = eval(line)
            if data in self.matches: continue
            self.matches.append(data)
            self.types.append({"type": "ghci"})
            if more <= 0: continue
            # 対象ワードが残り幾つ有るかを表示
            self.matches.append("more {}".format(more))
            self.types.append({"type": "ghci"})

    def do_complete(self, code, cursor_pos):
        self.pos = 0
        # 検索キーワードの開始位置を求める
        for i in range(cursor_pos-1,-1,-1):
            if not (code[i].isalnum() or code[i] in ["-", ".","_"]):
                self.pos = i + 1
                break
        self.matches = []
        self.types = []
        # 検索キーワードの取り出し
        word = code[self.pos:cursor_pos]
        if word != "":
            self.user_complete(word)
            self.ghci_complete(word)
        return  self.ret_val()

LOAD = "load"
RUNGHC = "runghc"
RUNGHCF = "runghcf"

class GHCiKernel(Kernel):
    implementation = 'GHCi'
    implementation_version = '0.0.1'
    language_info = {
        'name': 'haskell',
        'mimetype': 'text/x-haskell',
        'file_extension': '.hs'
    }

    language_version = '9.12.2'
    banner =  'Simple GHCi Kernel'

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.ghci = GHCi()
        self.runghc = RunGHC()
        self.complete = Complete(self.ghci)
        # ---runghcなどの処理メソッドを設定
        self.funcs = {
            RUNGHC: self.runghc.run,
            RUNGHCF: self.runghc.runf,
            LOAD: self.ghci.load_file,
            None: self.ghci.run
        }
        # ---runghcなどの文字列の検索用正規表現
        self.regex = re.compile(r"---(\w+) *(\w*)|.*")
        
    def data_send(self, name, text):
        if text == None or text == "": return
        stream_content = {'name': name, 'text': text}
        self.send_response(self.iopub_socket, 'stream', stream_content) 

    def do_execute(self, code, silent, store_history=True, 
                   user_expressions=None, allow_stdin=False):        
        if not silent:        
            res_re = self.regex.match(code)
            #func_id = res_re.group(1) でも同じ 
            func_id = res_re.groups()[0]
            if not func_id in self.funcs: func_id = None          
            stdout, stderr = self.funcs[func_id](code, res_re.groups()[1])  
            self.data_send("stdout", stdout)
            self.data_send("stderr", stderr)  
        return {'status': 'ok', 'execution_count': self.execution_count,
                'payload': [], 'user_expressions': {}}

    def do_complete(self, code, cursor_pos):
        res = self.complete.do_complete(code, cursor_pos)
        return {
            "matches": res["matches"],
            "cursor_end": cursor_pos,
            "cursor_start": res["start_pos"],
            "metadata": res["metadata"],
            "status": "ok",
        }
    
    def do_shutdown(self, restart):
        self.ghci.shutdown(restart)
        return super().do_shutdown(restart)

if __name__ == '__main__':
    from ipykernel.kernelapp import IPKernelApp
    IPKernelApp.launch_instance(kernel_class=GHCiKernel)

次のソースコードはユーザが補完するコードを定義するもので適当に変更してください。

ghci_completions.pyのソース (クリックする表示されます)
ghci_completions.py
CODE_TYPES = [
    "",         #0
    "reserved", #1
    "function", #2
    "snippet"   #3
]

FIB = """let fib 0 = 0
    fib 1 = 1
    fib n = fib(n-1) + fib(n-2) 
"""

HELLO = """---runghc
main = do
    print $ hello
hello = "Hello World!"
"""

def A(item, typ=0):
    return B(item, item, typ)

def B(key, item, typ=0):
    if type(item) != type([]):
        item = [item]
    if type(0) == type(typ):
        type_str =  CODE_TYPES[typ]
    else:
        type_str = typ
    return {"key": key, "item": item, "type": type_str}


CODE_COMPLETIONS =[
    A("abc.xyz","sample"),
    B("complete", ':complete repl ""', "command"),
    A("---load"), 
    A("---runghc"), 
    A("---runghcf"), 
    A("---ghci"), 
    A("case", 1),
    A("class", 1),
    B("data", ["data", "data family", "data instance"], 1),
    A("default", 1),
    B("deriving", ["deriving", "deriving instance"], 1),
    A("forall", 1),
    A("foreign", 1),
    A("hiding", 1),
    B("if", ["if then else"], 1),
    A("then", 1),
    A("else", 1),
    A("import", 1),
    A("infix", 1),
    A("infixl", 1),
    A("infixr", 1),
    A("instance", 1),
    A("module", 1),
    A("newtype", 1),
    A("proc", 1),
    A("qualified", 1),
    B("type", ["type", "type family", "type instance"], 1),
    A("where", 1),
    B("div", ["`div`", "div"], 2),
    B("mod", ["`mod`", "mod"], 2),
    B("rem", ["`rem`", "rem"], 2),
    B("quot", ["`quot`", "quot"], 2),
    B("elem", ["`elem`", "elem"], 2),
    B("notElem", ["`notElem`", "notElem"], 2),
    # keyと同じ文字が含んでないと上手く動作しない "x"
    # (x:xs)などはxが()内あるからダメなのかもしれない
    B("x", ["(x:xs)", "(x:[])", "(x:y)","xs"],3), 
    B("fib", FIB, 3),
    B("hello", HELLO, 3)
]

class GHCiKernel(Kernel):

これは前回の投稿記事を参照してください。変更部分を簡単説明します。

self.regex = re.compile(r"---(\w+) *(\w*)|.*")
~~~~
res_re = self.regex.match(code)
func_id = res_re.groups()[0]
if not func_id in self.funcs: func_id = None          
stdout, stderr = self.funcs[func_id](code, res_re.groups()[1])  
  • 最初の(\w+)runghcloadなどとマッチ、2番目の(\w*)はファイル名が有ったらマッチする。|.*がないとどれもマッチしないと例外が発生するので入れてある。マッチしないと両方のgroupがNoneになる
  • match(code)はcodeの1行目だけが検索対象になる
  • res_re.groups()[0]runghcloadなどが入り、res_re.groups()[1]にはファイル名が入っている
  • def do_complete
    • Cell上でタブキーを押すと補完機能が呼び出され、このメソッドが呼ばれる。

class GHCi

これも前回の投稿記事を参照してください。 ---loadの処理であるdef load_fileはソースのコメントを見て下さい。

class RunGHC

---runghcの処理をするこのクラスはCellのコードをファイルに書いて、runghc ファイル名を実行する。---runghcfはファイルの書き込み無く、単に指定ファイルを実行するだけである。実行にはsunprocess.Popenを使ってstdoutstderrはパイプを使ってrunghcとやり取りをする。

class Complete

Cell上でタブキーを押すと補完機能が機能するので、それの実装クラスである。

  • 補完候補をself.matchesに配列として格納
  • self.mathcesの格納した候補のタイプをtypesに辞書の配列としてself.matchesの順番と同じ順番で格納する
    • 辞書の形式: {"type":タイプの文字列}
    • タイプの文字列は何でもOK

def user_complete

このメソッドはghci_completions.pyで定義した補完候補データを用いる処理である。
{"key":補完キーワード, "item": [補完候補の配列], "type":補完候補のタイプ}
の配列がCODE_COMPLETIONSとして定義されている。

def ghci_complete

gchiのコマンド:complete repl 20 "補完キーワード"を送って対象の候補を最大で最初から20個送ってもらう処理である。個数を書かないと全部が対象になる。もし、20個を超えたら補完候補の最後にmore 残り個数を表示させる。

data = eval(line)

これはlineには"文字が含まれるので"を消すためにevalを行って、ただの文字列にしている。
'"word"' => 'word'

> ghci
GHCi, version 9.12.2: https://www.haskell.org/ghc/  :? for help
ghci> :complete repl "co"
6 6 ""
"compare"
"concat"
"concatMap"
"const"
"cos"
"cosh"
ghci> 

def do_complete

補完処理の入口。
タブが押されて位置から逆にたどり、英数,"-",".","_"以外の文字が現れるまで検索して、補完キーワードを確定して、あとは順番にuser,ghciの補完処理を行う。

テスト

入力[12]が---runghcでファイル名を fibonacci.hsとして書き出し、runghc fibonacci.hsを実行してその結果を表示しています。入力[13]は入力[12]で作成したfibonacci.hsを指定して実行しています。当然、既存のファイルでもOKです。
haskell07.png

入力[17]を _temp_load.hsに書き出し、それを:loadしています。入力[19]は入力[18]で作成しloadしたものが使えています。入力[20]は:loadとそれを利用したコードを別Cellに入力するのは面倒なので---ghciと記述することで同じCellに書くこと出来ます。

haskell08.png

次からは補完のテストですが、coで補完しています。右端に出ているcommandとはユーザ定義の補完ワードでghci:complete replで取って来たものです。

haskell09.png

hellで補完してるので、ユーザ定義のhelloが候補になっています。これを選択するとその次に画像のようにHello Worldのコードが入力されます。

haskell10.png

haskell11.png

次はxで補完した場合、ユーザが定義した候補出ています。また、.の次も対象になっていてabc.xyzが候補して表示されています。

haskell12.png

次はfiで補完した場合は3番目にfibが出ていて左右の端に何も表示されていませんが、これは同じCellに元々fibという語があるため候補に上がっていると思います。これは、kernelクラスがやっている思われ自分では実装していません。

haskell13.png

終わりに

一番手間取ったのが補完機能です。:complete replを知らなかったので、最初はghciにタブ文字を送って候補一覧取得しようとしても上手くいかず、ユーザ定義だけで済まそうと思っていました。しかし、Preludeの関数等は500個ぐらい有って段々面倒になってしまいました。 :complete replが有って良かったです。
また、:loadすると:loadする以前に定義した関数等が使えなくなるもの知らなかったです。
もう少しHaskellを勉強して欲しい機能が出てきたら、また改良しようかと思います。

Windows版について

Windows版にも挑戦しましたが、REPLWrapperはWindowsでは使えませんでした。代替コードはあったので試しみてノーマルケースは正常に動作したが、CTRL+Cghciに送ることがどうしても出来ずとりあえず保留状態です。あと、強制終了も出来なかった。Windowsのghciはプロセスが3個立上がっているのが原因の一つかとも思っています。

Appendix

自分ようにテストコードを載せます。

subprocess.Popen テストコード

popentest.py
import os
import subprocess

CMD="""
fib 0 = 0
fib 1 = 1
fib n = fib (n-1) + fib (n-2) 
main = do 
    print  $ fib 10
"""
pipe = False
if pipe:
    p = subprocess.Popen(["runghc"],
                        stdin  = subprocess.PIPE,
                        stdout = subprocess.PIPE,
                        stderr = subprocess.PIPE
                        )
    p.stdin.write(CMD.encode())
    p.stdin.flush()
else:
    file_name = "_temp.hs"
    with open(file_name, "w") as f:
        f.write(CMD)
    p = subprocess.Popen(["runghc", file_name],
                        stdout = subprocess.PIPE,
                        stderr = subprocess.PIPE
                        )
try:
    stdout, stderr = p.communicate()
    print(stdout.decode(), end="")
    print(stderr.decode(), end="")
except BaseException as e:
    print(e)
if not pipe:
    os.remove(file_name)
> python popentest.py
55

REPLWrapperのテストコード

repltest.py
from pexpect.replwrap import REPLWrapper
import pexpect
wrapper = REPLWrapper("ghci", "ghci> ", None,
                              continuation_prompt="ghci| ")

CODE="""
:set +m
let fib 0 = 0
    fib 1 = 1
    fib n = fib(n-1) + fib(n-2)

"""
stdout = wrapper.run_command(CODE)
print(stdout, end="")
stdout = wrapper.run_command("fib 10")
print(stdout, end="")
try:
    stdout = wrapper.run_command("fib 100")
    print(stdout, end="")
except BaseException as e:
    print(type(e).__name__)
    wrapper.child.sendintr()
    wrapper._expect_prompt()
    print(wrapper.child.before, end="")

55が表示された後CTRL+Cを押下すると例外が発生します。

> python repltest.py
55
^CKeyboardInterrupt
Interrupted.
3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?