概要
前回作成した改良版に以下の3つの機能を追加します。
- Cellの内容をrunghcに渡して実行させてその結果を表示する
- Cellの内容をファイルに書いて、それを :loadする機能
- コードの補完機能
このように機能アップしたら、Haskellの学習がすごく進みました。
今まで作成&解説した以下の2つの記事の内容を理解している前提で書きます。
実行環境
> sw_vers
ProductName: macOS
ProductVersion: 15.4.1
BuildVersion: 24E263
> ghci --version
The Glorious Glasgow Haskell Compilation System, version 9.12.2
> python --version
Python 3.12.7
> jupyter --version
Selected Jupyter core packages...
IPython : 8.27.0
ipykernel : 6.28.0
ipywidgets : 7.8.1
jupyter_client : 8.6.0
jupyter_core : 5.7.2
jupyter_server : 2.14.1
jupyterlab : 4.2.5
nbclient : 0.8.0
nbconvert : 7.16.4
nbformat : 5.10.4
notebook : 7.2.2
qtconsole : 5.5.1
traitlets : 5.14.3
機能アップした内容
機能アップしたテスト画像から見たほうが良いかのもしれません。
Cell Magicのような機能
PythonでCellの先頭行に書いた%%htmlなどの Cell Magicのような機能を追加した。
runghc
Cellの先頭行に次の文字列があるときにCellの内容をrunghcに処理させて結果を表示する。
具体的にはCellの内容をfilename.hsあるいは指定がないときは _temp_runghc.hsに書いて、runghc filename.hsとして実行させます。_temp_runghc.hsは処理終了後に削除します。
---runghc filename
Haskell Code
...
runghcf
既存のファイルfilename.hsでrunghc filename.hsとして実行させます。
---runghcf filename
:load
Cellの先頭行に次の文字列があるときにCellの内容(Haskell Code1)をfilename.hs あるいは指定がないときは _temp_load.hsに書いて、ghci上で:load filename.hsをして実行し、Haskell Code2があれば:load後に実行します。_temp_load.hsの場合は処理終了後に削除します。
---load filename
Haskell Code1
...
---ghci
Haskell Code2
...
コードの補完機能
ghciで出来る補完に自分で補完するコードを加えたものの実装。
ghciでの補完は:complete replを使った。
インストール
kernel情報のインストール
ファイル構成
ghci/
└─ kernel.json
"/Users/xxxx/yyyy" はkernelソースのghci_kernel.pyとghci_completions.pyを配置するディレクトリ名で、元々パスが通っていればenvは無くてもOK。
{
"argv": ["python", "-m",
"ghci_kernel", "-f",
"{connection_file}"],
"display_name": "GHCi",
"language": "haskell",
"env": {
"PYTHONPATH": "/Users/xxxx/yyyy"
}
}
インストール (ghciはディレクトリ名)
> jupyter kernelspec install ghci --user
[InstallKernelSpec] Installed kernelspec ghci in /Users/xxxx/Library/Jupyter/kernels/ghci
次に、これから説明するghci_kernel.pyとghci_completions.pyを"PYTHONPATH"のディレクトリに配置してください。以上でインストールは終わりです。
ソースコード
import os
import re
import signal
import pexpect
import subprocess
import shlex
from ipykernel.kernelbase import Kernel
from pexpect.replwrap import REPLWrapper
from ghci_completions import CODE_COMPLETIONS
INIT_CMD = """
:set +m
"""
class GHCi:
def __init__(self):
self.start_ghci()
self.reg1st = re.compile(r".*\n")
self.regex = re.compile(r"^---(\w+)", re.MULTILINE)
def start_ghci(self):
self.replwrapper = REPLWrapper("ghci", "ghci> ", None,
continuation_prompt="ghci| ")
self.child = self.replwrapper.child
self.run_command(INIT_CMD)
# re_groupsは使用しない他のclassの引数と合わせるため
def run(self, code, re_groups=None):
if not code.endswith("\n"):
code += "\n"
return self.run_command(code)
def run_command(self, code, timeout=None):
try:
stderr = None
stdout = self.replwrapper.run_command(code, timeout=timeout)
except BaseException as e:
stdout, stderr = self.interrupt(e)
return stdout, stderr
def interrupt(self, e):
stderr = "exception: {}".format(type(e).__name__)
# GHCiが起動中かの確認
if self.child.isalive():
# GHCiにSIGIN(CTRL+C)を送る
self.child.sendintr()
# GHCiがSIGINTを受けてプロンプトを表示するまでのデータを受け取る
self.replwrapper._expect_prompt()
# 受け取った内容をstdoutに代入
stdout = self.child.before
else:
# GHCiが起動していないので再起動
self.start_ghci()
stdout = "Restarted GHCi"
return stdout, stderr
def load_file(self, code, f_name):
# codeの1行目全体をマッチさせる
res_re = self.reg1st.match(code)
if res_re == None: return "", "001:program error"
# codeの1行目の取り除く
code2 = code[res_re.span()[1]:]
# ---ghciがあるか検索
res_re = self.regex.search(code2)
code3 = ""
if res_re != None:
# ---ghciが入力された場合
if res_re.groups()[0] == "ghci":
code3 = code2[res_re.span()[1]:]
code2 = code2[0:res_re.span()[0]]
if f_name in ["", None]:
file_name = "_temp_load.hs"
else:
file_name = "{}.hs".format(f_name)
with open(file_name, "w") as f:
f.write(code2)
outs = self.run(":load {}\n{}".format(file_name, code3))
if f_name in ["", None]:
if os.path.isfile(file_name):
os.remove(file_name)
return outs
def shutdown(self, restart):
try:
self.replwrapper.run_command(":quit", timeout=0.5)
except pexpect.EOF:
return
except BaseException:
pass
if self.child.isalive():
# GHCiが終了していないときは強制終了
self.child.kill(signal.SIGKILL)
class RunGHC:
def communicate(self):
# runghcが出力した内容(stdout,stderr)を受け取る
stdoutb, stderrb = self.subp.communicate()
return stdoutb.decode(), stderrb.decode()
def run(self, code, f_name):
# ソースをPIPEにするとMacでは/var/foldersにソースの一時ファイルが作成されて残ってしまう
if f_name in ["", None]:
file_name = "_temp_runghc.hs"
else:
file_name = "{}.hs".format(f_name)
with open(file_name, "w") as f:
f.write(code)
outs = self.runf(code, file_name)
if f_name in ["", None]:
if os.path.isfile(file_name):
os.remove(file_name)
return outs
def runf(self, code, file_name):
try:
self.subp = subprocess.Popen(["runghc", file_name],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE)
outs = self.communicate()
except BaseException as e:
# 強制終了させる
self.subp.kill()
outs = self.communicate()
except_name = "exception: {}".format(type(e).__name__)
if outs[1] == "":
outs[1] = except_name
else:
outs[1] += ("\n" + except_name)
# もし runghcが起動したままなら強制終了させる
if self.subp.poll() == None:
self.subp.kill()
self.communicate()
return outs
class Complete:
def __init__(self, ghci):
self.ghci = ghci
def ret_val(self):
# self.typesは必須ではない。そのときは "metadata":{}とする
return {"matches": self.matches, "start_pos": self.pos,
"metadata": {'_jupyter_types_experimental': self.types}
}
def user_complete(self, word):
# \.{0}はabc.xでタブキーが押されたらキーワードをxとする
regex = re.compile(r"^{0}|\.{0}".format(word))
for data in CODE_COMPLETIONS:
key = data["key"]
if regex.search(key) == None: continue
item = data["item"]
itemtype = data["type"]
self.matches += item
self.types += [{"type": itemtype}]*len(item)
def ghci_complete(self, word):
stdout, stderr = self.ghci.run_command(
':complete repl 20 "{}"'.format(word), timeout=2)
if stderr != None:
return self.ret_val()
lines = stdout.splitlines()
# linesの1行目を空白で分割する
top = shlex.split(lines[0])
# :complete repl で指定した数と実際対象となった数の差を計算
more = eval("{}-{}".format(top[1], top[0]))
# 対象ワードは2行目から
for line in lines[1:]:
data = eval(line)
if data in self.matches: continue
self.matches.append(data)
self.types.append({"type": "ghci"})
if more <= 0: continue
# 対象ワードが残り幾つ有るかを表示
self.matches.append("more {}".format(more))
self.types.append({"type": "ghci"})
def do_complete(self, code, cursor_pos):
self.pos = 0
# 検索キーワードの開始位置を求める
for i in range(cursor_pos-1,-1,-1):
if not (code[i].isalnum() or code[i] in ["-", ".","_"]):
self.pos = i + 1
break
self.matches = []
self.types = []
# 検索キーワードの取り出し
word = code[self.pos:cursor_pos]
if word != "":
self.user_complete(word)
self.ghci_complete(word)
return self.ret_val()
LOAD = "load"
RUNGHC = "runghc"
RUNGHCF = "runghcf"
class GHCiKernel(Kernel):
implementation = 'GHCi'
implementation_version = '0.0.1'
language_info = {
'name': 'haskell',
'mimetype': 'text/x-haskell',
'file_extension': '.hs'
}
language_version = '9.12.2'
banner = 'Simple GHCi Kernel'
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.ghci = GHCi()
self.runghc = RunGHC()
self.complete = Complete(self.ghci)
# ---runghcなどの処理メソッドを設定
self.funcs = {
RUNGHC: self.runghc.run,
RUNGHCF: self.runghc.runf,
LOAD: self.ghci.load_file,
None: self.ghci.run
}
# ---runghcなどの文字列の検索用正規表現
self.regex = re.compile(r"---(\w+) *(\w*)|.*")
def data_send(self, name, text):
if text == None or text == "": return
stream_content = {'name': name, 'text': text}
self.send_response(self.iopub_socket, 'stream', stream_content)
def do_execute(self, code, silent, store_history=True,
user_expressions=None, allow_stdin=False):
if not silent:
res_re = self.regex.match(code)
#func_id = res_re.group(1) でも同じ
func_id = res_re.groups()[0]
if not func_id in self.funcs: func_id = None
stdout, stderr = self.funcs[func_id](code, res_re.groups()[1])
self.data_send("stdout", stdout)
self.data_send("stderr", stderr)
return {'status': 'ok', 'execution_count': self.execution_count,
'payload': [], 'user_expressions': {}}
def do_complete(self, code, cursor_pos):
res = self.complete.do_complete(code, cursor_pos)
return {
"matches": res["matches"],
"cursor_end": cursor_pos,
"cursor_start": res["start_pos"],
"metadata": res["metadata"],
"status": "ok",
}
def do_shutdown(self, restart):
self.ghci.shutdown(restart)
return super().do_shutdown(restart)
if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
IPKernelApp.launch_instance(kernel_class=GHCiKernel)
次のソースコードはユーザが補完するコードを定義するもので適当に変更してください。
ghci_completions.pyのソース (クリックする表示されます)
CODE_TYPES = [
"", #0
"reserved", #1
"function", #2
"snippet" #3
]
FIB = """let fib 0 = 0
fib 1 = 1
fib n = fib(n-1) + fib(n-2)
"""
HELLO = """---runghc
main = do
print $ hello
hello = "Hello World!"
"""
def A(item, typ=0):
return B(item, item, typ)
def B(key, item, typ=0):
if type(item) != type([]):
item = [item]
if type(0) == type(typ):
type_str = CODE_TYPES[typ]
else:
type_str = typ
return {"key": key, "item": item, "type": type_str}
CODE_COMPLETIONS =[
A("abc.xyz","sample"),
B("complete", ':complete repl ""', "command"),
A("---load"),
A("---runghc"),
A("---runghcf"),
A("---ghci"),
A("case", 1),
A("class", 1),
B("data", ["data", "data family", "data instance"], 1),
A("default", 1),
B("deriving", ["deriving", "deriving instance"], 1),
A("forall", 1),
A("foreign", 1),
A("hiding", 1),
B("if", ["if then else"], 1),
A("then", 1),
A("else", 1),
A("import", 1),
A("infix", 1),
A("infixl", 1),
A("infixr", 1),
A("instance", 1),
A("module", 1),
A("newtype", 1),
A("proc", 1),
A("qualified", 1),
B("type", ["type", "type family", "type instance"], 1),
A("where", 1),
B("div", ["`div`", "div"], 2),
B("mod", ["`mod`", "mod"], 2),
B("rem", ["`rem`", "rem"], 2),
B("quot", ["`quot`", "quot"], 2),
B("elem", ["`elem`", "elem"], 2),
B("notElem", ["`notElem`", "notElem"], 2),
# keyと同じ文字が含んでないと上手く動作しない "x"
# (x:xs)などはxが()内あるからダメなのかもしれない
B("x", ["(x:xs)", "(x:[])", "(x:y)","xs"],3),
B("fib", FIB, 3),
B("hello", HELLO, 3)
]
class GHCiKernel(Kernel):
これは前回の投稿記事を参照してください。変更部分を簡単説明します。
self.regex = re.compile(r"---(\w+) *(\w*)|.*")
~~~~
res_re = self.regex.match(code)
func_id = res_re.groups()[0]
if not func_id in self.funcs: func_id = None
stdout, stderr = self.funcs[func_id](code, res_re.groups()[1])
- 最初の
(\w+)がrunghcやloadなどとマッチ、2番目の(\w*)はファイル名が有ったらマッチする。|.*がないとどれもマッチしないと例外が発生するので入れてある。マッチしないと両方のgroupがNoneになる -
match(code)はcodeの1行目だけが検索対象になる -
res_re.groups()[0]はrunghcやloadなどが入り、res_re.groups()[1]にはファイル名が入っている -
def do_complete- Cell上でタブキーを押すと補完機能が呼び出され、このメソッドが呼ばれる。
class GHCi
これも前回の投稿記事を参照してください。 ---loadの処理であるdef load_fileはソースのコメントを見て下さい。
class RunGHC
---runghcの処理をするこのクラスはCellのコードをファイルに書いて、runghc ファイル名を実行する。---runghcfはファイルの書き込み無く、単に指定ファイルを実行するだけである。実行にはsunprocess.Popenを使ってstdoutとstderrはパイプを使ってrunghcとやり取りをする。
class Complete
Cell上でタブキーを押すと補完機能が機能するので、それの実装クラスである。
- 補完候補を
self.matchesに配列として格納 -
self.mathcesの格納した候補のタイプをtypesに辞書の配列としてself.matchesの順番と同じ順番で格納する- 辞書の形式:
{"type":タイプの文字列} - タイプの文字列は何でもOK
- 辞書の形式:
def user_complete
このメソッドはghci_completions.pyで定義した補完候補データを用いる処理である。
{"key":補完キーワード, "item": [補完候補の配列], "type":補完候補のタイプ}
の配列がCODE_COMPLETIONSとして定義されている。
def ghci_complete
gchiのコマンド:complete repl 20 "補完キーワード"を送って対象の候補を最大で最初から20個送ってもらう処理である。個数を書かないと全部が対象になる。もし、20個を超えたら補完候補の最後にmore 残り個数を表示させる。
data = eval(line)
これはlineには"文字が含まれるので"を消すためにevalを行って、ただの文字列にしている。
'"word"' => 'word'
> ghci
GHCi, version 9.12.2: https://www.haskell.org/ghc/ :? for help
ghci> :complete repl "co"
6 6 ""
"compare"
"concat"
"concatMap"
"const"
"cos"
"cosh"
ghci>
def do_complete
補完処理の入口。
タブが押されて位置から逆にたどり、英数,"-",".","_"以外の文字が現れるまで検索して、補完キーワードを確定して、あとは順番にuser,ghciの補完処理を行う。
テスト
入力[12]が---runghcでファイル名を fibonacci.hsとして書き出し、runghc fibonacci.hsを実行してその結果を表示しています。入力[13]は入力[12]で作成したfibonacci.hsを指定して実行しています。当然、既存のファイルでもOKです。

入力[17]を _temp_load.hsに書き出し、それを:loadしています。入力[19]は入力[18]で作成しloadしたものが使えています。入力[20]は:loadとそれを利用したコードを別Cellに入力するのは面倒なので---ghciと記述することで同じCellに書くこと出来ます。
次からは補完のテストですが、coで補完しています。右端に出ているcommandとはユーザ定義の補完ワードでghciは:complete replで取って来たものです。
hellで補完してるので、ユーザ定義のhelloが候補になっています。これを選択するとその次に画像のようにHello Worldのコードが入力されます。
次はxで補完した場合、ユーザが定義した候補出ています。また、.の次も対象になっていてabc.xyzが候補して表示されています。
次はfiで補完した場合は3番目にfibが出ていて左右の端に何も表示されていませんが、これは同じCellに元々fibという語があるため候補に上がっていると思います。これは、kernelクラスがやっている思われ自分では実装していません。
終わりに
一番手間取ったのが補完機能です。:complete replを知らなかったので、最初はghciにタブ文字を送って候補一覧取得しようとしても上手くいかず、ユーザ定義だけで済まそうと思っていました。しかし、Preludeの関数等は500個ぐらい有って段々面倒になってしまいました。 :complete replが有って良かったです。
また、:loadすると:loadする以前に定義した関数等が使えなくなるもの知らなかったです。
もう少しHaskellを勉強して欲しい機能が出てきたら、また改良しようかと思います。
Windows版について
Windows版にも挑戦しましたが、REPLWrapperはWindowsでは使えませんでした。代替コードはあったので試しみてノーマルケースは正常に動作したが、CTRL+Cをghciに送ることがどうしても出来ずとりあえず保留状態です。あと、強制終了も出来なかった。Windowsのghciはプロセスが3個立上がっているのが原因の一つかとも思っています。
Appendix
自分ようにテストコードを載せます。
subprocess.Popen テストコード
import os
import subprocess
CMD="""
fib 0 = 0
fib 1 = 1
fib n = fib (n-1) + fib (n-2)
main = do
print $ fib 10
"""
pipe = False
if pipe:
p = subprocess.Popen(["runghc"],
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
p.stdin.write(CMD.encode())
p.stdin.flush()
else:
file_name = "_temp.hs"
with open(file_name, "w") as f:
f.write(CMD)
p = subprocess.Popen(["runghc", file_name],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
try:
stdout, stderr = p.communicate()
print(stdout.decode(), end="")
print(stderr.decode(), end="")
except BaseException as e:
print(e)
if not pipe:
os.remove(file_name)
> python popentest.py
55
REPLWrapperのテストコード
from pexpect.replwrap import REPLWrapper
import pexpect
wrapper = REPLWrapper("ghci", "ghci> ", None,
continuation_prompt="ghci| ")
CODE="""
:set +m
let fib 0 = 0
fib 1 = 1
fib n = fib(n-1) + fib(n-2)
"""
stdout = wrapper.run_command(CODE)
print(stdout, end="")
stdout = wrapper.run_command("fib 10")
print(stdout, end="")
try:
stdout = wrapper.run_command("fib 100")
print(stdout, end="")
except BaseException as e:
print(type(e).__name__)
wrapper.child.sendintr()
wrapper._expect_prompt()
print(wrapper.child.before, end="")
55が表示された後CTRL+Cを押下すると例外が発生します。
> python repltest.py
55
^CKeyboardInterrupt
Interrupted.





