はじめに
Raspberry piを搭載することで機能拡張を行えるMSXPiは、MSXエミュレータ上のopenMSXでも利用可能です。
MSXPiを利用可能なopenMSXのビルド方法は、以下の記事で紹介しました。
今回は、MSX上のコマンドを処理するサーバーである、「msxpi-server」をRaspberry pi OS上で動作させました。
公開されているサーバーのpythonスクリプトをRaspberry pi OS上で動作させると、openMSXのSocketを監視せずに、動作しているRaspberry piのGPIOを監視するため、openMSXと通信ができません。
そこで、GPIOアクセスの機能を停止してsocket通信のみをするように修正しました。
MSXPiの動作の仕組み
MSXPiとmsxpi-serverの通信の仕組みを以下に示します。
- MSX実機とMSXPiカートリッジの組み合わせ
┌────────────┐ ┌────────────────────────┐ ┌────────────────────┐
│ MSX │ <---> │ MSXPi Interface │ <---> │ msxpi-server │
│ (Z80 I/O) │ 0x56 │ (CPLD + SPI bridge) │ GPIO │ (Python GPIO API) │
└────────────┘ 0x5A └────────────────────────┘ └────────────────────┘
MSXが起動すると、MSXPiカートリッジ上のraspberry piが起動し、msx-serverが自動起動します。
- MSXPiを接続したopenMSXとopenMSXの動作するマシン上でmsxpi-server との組み合わせ
┌────────────┐ ┌────────────────────────┐ ┌────────────────────┐
│ MSX │ <---> │ MSXPi Interface │ <---> │ msxpi-server │
│ (Z80 I/O) │ 0x56 │ (MSXPiDevice + socket) │ Socket │(Python socket API) │
└────────────┘ 0x5A └────────────────────────┘ └────────────────────┘
MSXPiを接続したopenMSXを起動し、別途msxpi-serverを起動します。
公開されているmsxpi-serverをraspberry pi OS上で起動
公開されているサーバーをRaspberry piで動作させると、MSXからの信号をGPIOから受信しようとします。
エミュレータとの通信では、Python socket APIを介して、openMSXと通信する必要があります。
Raspberry pi上で動作するopenMSXと通信するためには、Python socket APIを介した通信に専念してもらう必要があります。
修正箇所
Copilotに修正箇所のまとめを作成してもらいました。
以下は msxpi-server.py → msxpi-server-socket.py の差分(パッチ)から読み取れる主な修正点のまとめです。要点ごとに、該当コードの根拠をファイル原文に基づき示しています(引用はアップロードいただいたファイルより)。
1) 動作モードの転換:GPIOベース → ソケット専用
-
Raspberry Pi の GPIO/SPI ビットバン関連を全面的に無効化し、常に TCP ソケットで MSX と通信する構成へ簡素化
- 送受信処理は
SPI_MASTER_transfer_byte()/piexchangebyte()を socket.send/recv ベースに統一(GPIO 分岐を削除)しています。
(例:msxpi-server-socket.pyのSPI_MASTER_transfer_byte()はconn.sendall()/conn.recv(1)のみ) -
initialize_connection()は サーバソケットを開いて待受 → 1 接続受理の流れのみで、GPIO 初期化や割り込み(ボタン)設定はありません - これにより Windows/macOS/Linux で統一挙動(ソケット通信)になります
- 送受信処理は
2) ホスト認識とバージョン情報の変更
-
detect_host()は Raspberry Pi を特別扱いせず、Windows/MacOS/Linuxのいずれかを返すのみ- Raspberry Pi 判定のための
/proc/cpuinfoチェックなどを削除
- Raspberry Pi 判定のための
-
versionを"1.3-socket"に変更し、起動メッセージで Socket Mode を明示
3) ルートパス・作業領域の見直し
- 既定の作業ディレクトリを
/home/pi/msxpi→~/msxpi(ユーザホーム配下) に変更。 - RAM ディスク/テンポラリを
/media/ramdisk→/tmp/msxpi_ramdiskに変更し、存在しなければ作成する処理を追加 - INI ファイルの読込/保存を 簡素化(
updateIniFile()の処理や初期値群の見直し)
4) 非対応コマンドの扱い統一(プラットフォーム差の明示)
-
GPIO/オーディオ/再起動系のコマンドは 「このプラットフォームでは非対応」 として明示的に応答:
-
pplay(),pvol(),irc(),preboot(),pshut()などが 非対応メッセージを返す構造へ
-
- これにより、ソケット専用版は ファイル操作・コマンド実行・データ転送などのネットワーク越し基本機能に集中します
5) 文字コード処理の簡素化
- Windows/Raspberry Pi 特化の cp932/JIS(ISO-2022-JP)変換を含む詳細な文字コードケアやデバッグ出力を多く削減
- 例:
pdir()での複雑な日本語→JIS 変換処理が シンプルなdecode()のみに整理されています
- 例:
- その結果、メッセージ文字化け対策は最小限に留め、実装・保守性を優先。
6) セキュリティ告知の強化(Socket 専用での注意喚起)
- 起動時に **「PRUN による任意コマンド実行」「PDIR/PCOPY によるホスト上/ネットワーク上のファイル参照」**など、
サーバのリスクと機能範囲を明確化する注意喚起を出力(Socket 版である旨も表示)
7) データ転送・ストレージ関連の維持・微調整
-
senddata()/recvdata()/sendmultiblock()の チェックサム計算と **再送(リトライ)**の枠組みは維持しつつ、
タイムアウトや 例外ハンドリングをソケット運用に合わせて整理しています -
pcopy()の **/z 展開(LZH/他書庫)**や MSX-DOS イメージへの直接書き込みの流れは踏襲- Windows/Linux での展開コマンド呼び分けを簡素化し、テンポラリパスも
/tmp/msxpiを中心に再構成
- Windows/Linux での展開コマンド呼び分けを簡素化し、テンポラリパスも
8) エラーハンドリング/例外処理の見直し
-
ソケットタイムアウトの専用メッセージ(例:
"Send timeout: peer not responding.")を追加し、
例外時は可能な範囲でループ継続する設計に変更 - 既存の GPIO クリーンアップや kill スクリプト呼び出し等は削除(不要化)
9) OpenAI 連携(chatgpt)コマンドの最小動作
-
OPENAIKEY未設定時は 明示的なエラーを返す簡素版。 - リクエストは
requests.post()で送信し、レスポンスのchoices[0].message.contentを返却(軽微な防御的実装)
※ ネットワーク到達性や API 仕様変更により動作は環境依存。
10) 細部の定数・メッセージ変更
-
HOST='0.0.0.0'/PORT=5000を継続しつつ、ソケット受け入れ直後に接続元をログ -
GLOBALRETRIESや各種 RC(リターンコード)値は踏襲 - ビルドID などの文字列は Socket 版へ合わせて整理・更新
まとめ(ねらい)
この改修は、Raspberry Pi 依存の GPIO/SPI 実装を取り外して、OS 非依存のソケット通信版へ再編することが主眼です
結果として、開発・テストが容易になり、Windows/macOS/Linux どこでも同一手順で MSX 側クライアントと TCP でやり取りできます。一方、GPIO を使用する機能(再起動・ボタン割り込み・LED 等)や音量/再生系の機能は非対応になります
おわりに
Raspberry pi OS上で、MSXPiの動作を確認する環境を構築することができました。
左のウィンドウがサーバーの画面で、右のウィンドウがopenMSXの画面です。

画面右下の「prun pwd」はサーバーの動作するディレクトリを表示するコマンドです。
A>prun pwd
/home/kazueda/MSX
「/home/kazueda/MSX」がサーバーの動作しているディレクトリだとわかります。
このディレクトリにMSXのディスクイメージを保存しておくと、openMSX側でマウントすることができます。
また、サーバー側にchatgptのAPIを登録しておくと、MSX側で「CHATGPT.COM」に続いて、プロンプトを記入して、送信すると、ChatGPTの回答を表示してくれます。
Tokenをお持ちの方はぜひ試してみてください。
おまけ
Raspberry pi上でsocket通信を行うようサーバースクリプトを変更するパッチファイルです。
--- msxpi-server.py
+++ msxpi-server-socket.py
@@ -51,7 +51,14 @@
from io import StringIO
from contextlib import redirect_stdout
-version = "1.3"
+# GPIO library is not needed for socket communication,
+# but we'll keep the try-except block to avoid errors if other parts of the code reference it.
+try:
+ import RPi.GPIO as GPIO
+except ImportError:
+ GPIO = None
+
+version = "1.3-socket"
BuildId = "20251016.005"
CMDSIZE = 3 + 9
@@ -104,8 +111,9 @@
NoTimeOutCheck = False
TimeOutCheck = True
-MSXPIHOME = "/home/pi/msxpi"
-RAMDISK = "/media/ramdisk"
+# Adjusted for a more generic linux environment
+MSXPIHOME = os.path.expanduser("~/msxpi")
+RAMDISK = "/tmp/msxpi_ramdisk"
TMPFILE = RAMDISK + "/msxpi.tmp"
# irc
@@ -119,114 +127,37 @@
PORT = 5000 # Match this with serverPort in your C++ code
conn = None
-hostType = "RaspberryPi"
-RPI_SHUTDOWN = 26
-press_time = None
-
def detect_host():
+ # Force socket mode by returning a non-RaspberryPi host type
+ # This makes the server always use the socket communication logic.
system = platform.system()
- machine = platform.machine()
-
if system == "Windows":
return "Windows"
elif system == "Darwin":
return "MacOS"
- elif system == "Linux":
- # Check for Raspberry Pi
+ else:
+ return "Linux"
+
+def SPI_MASTER_transfer_byte(byte_out=None):
+ global conn
+ byte_in = 0
+ conn.settimeout(3.0)
+ if byte_out is not None:
+ conn.sendall(bytes([byte_out]))
+ else:
try:
- with open("/proc/cpuinfo", "r") as f:
- cpuinfo = f.read()
- if "Raspberry Pi" in cpuinfo or "BCM" in cpuinfo or "Raspberry" in platform.uname().node:
- return "RaspberryPi"
- except Exception:
- pass
- return "Linux"
- else:
- return system
-
-def init_spi_bitbang():
-# Pin Setup:
- GPIO.setmode(GPIO.BCM)
- GPIO.setup(SPI_CS, GPIO.IN, pull_up_down=GPIO.PUD_UP)
- GPIO.setup(SPI_SCLK, GPIO.OUT)
- GPIO.setup(SPI_MOSI, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
- GPIO.setup(SPI_MISO, GPIO.OUT)
- GPIO.setup(RPI_READY, GPIO.OUT)
- GPIO.setup(RPI_SHUTDOWN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
-
-def tick_sclk():
-
- global SPI_SCLK
- GPIO.output(SPI_SCLK, GPIO.HIGH)
- time.sleep(0.00001)
- GPIO.output(SPI_SCLK, GPIO.LOW)
-
-def SPI_MASTER_transfer_byte(byte_out=None):
-
- global conn, hostType
- byte_in = 0
- if hostType == "RaspberryPi":
- #print("SPI_MASTER_transfer_byte(): Raspberry Pi")
- tick_sclk()
-
- for bit in [0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01]:
- # Send bit if byte_out is provided
- if byte_out is not None:
- GPIO.output(SPI_MISO, GPIO.HIGH if (byte_out & bit) else GPIO.LOW)
- else:
- GPIO.output(SPI_MISO, GPIO.LOW) # Passive receive mode
-
- GPIO.output(SPI_SCLK, GPIO.HIGH)
-
- # Always read MOSI
- if GPIO.input(SPI_MOSI):
- byte_in |= bit
-
- GPIO.output(SPI_SCLK, GPIO.LOW)
-
- tick_sclk()
- else:
- conn.settimeout(3.0)
- if byte_out is not None:
- # print("SPI_MASTER_transfer_byte(): Non-Raspberry Pi conn.sendall")
- conn.sendall(bytes([byte_out]))
- else:
- # print("SPI_MASTER_transfer_byte(): Non-Raspberry Pi conn.recv")
- try:
- byte_in = conn.recv(1)[0] # Passive receive mode
- except socket.timeout:
- print("SPI_MASTER_transfer_byte(): recv timed out")
- return RC_FAILED,None
-
- #print(f"Received: {chr(byte_in)}")
+ byte_in = conn.recv(1)[0] # Passive receive mode
+ except socket.timeout:
+ print("SPI_MASTER_transfer_byte(): recv timed out")
+ return RC_FAILED,None
return RC_SUCCESS,byte_in
-
+
def piexchangebyte(byte_out=None):
"""
- Exchanges a byte with the MSXPi interface.
- If byte_out is provided, sends it and ignores the response.
- If byte_out is None, waits and reads a byte from MSX.
+ Exchanges a byte with the MSXPi interface over a socket.
"""
-
- global hostType
- if hostType == "RaspberryPi":
- #print("piexchange(): Raspberry Pi")
- # GPIO-based SPI emulation
- global SPI_CS, RPI_READY
-
- GPIO.output(RPI_READY, GPIO.HIGH)
- while GPIO.input(SPI_CS):
- #print("Waiting SPI_CS signal")
- pass
-
- rc, byte_in = SPI_MASTER_transfer_byte(byte_out)
- GPIO.output(RPI_READY, GPIO.LOW)
- else:
- #print("piexchange(): Non-Raspberry Pi")
- # Socket-based communication
- global conn
- rc, byte_in = SPI_MASTER_transfer_byte(byte_out)
-
+ global conn
+ rc, byte_in = SPI_MASTER_transfer_byte(byte_out)
return rc, byte_in
# Using CRC code from :
@@ -255,6 +186,7 @@
# create a subclass and override the handler methods
class MyHTMLParser(HTMLParser):
def __init__(self):
+ super().__init__()
self.reset()
self.NEWTAGS = []
self.NEWATTRS = []
@@ -270,14 +202,13 @@
self.HTMLDATA = []
def convert_charrefs(self, data):
print("MyHTMLParser: convert_charrefs found :", data)
-
+
def pathExpander(path, basepath = ''):
- #print(f"pathExpander()")
-
path=path.strip().rstrip(' \t\n\0')
if path.strip() == "..":
path = basepath.rsplit('/', 1)[0]
+ if not path: path = '/'
basepath = ''
if len(path) == 0 or path == '' or path.strip() == "." or path.strip() == "*":
path = basepath
@@ -302,8 +233,7 @@
newpath = path
elif basepath.startswith('/'):
urltype = 0 # this is a local path
- newpath = basepath + '/' + path
- newpath = newpath.replace('//','/')
+ newpath = os.path.join(basepath, path)
else:
urltype = 1 # this is a network path
newpath = basepath + "/" + path
@@ -348,10 +278,8 @@
drvletter = str(fpath[0]).upper()
msxdrive = ord(drvletter) - 64
- #convert filename to 8.3 format using all 11 positions required for the FCB
msxfcbfname = dos83format(msxfile)
- # send FCB structure to MSX
buf = bytearray()
buf.extend(msxdrive.to_bytes(1,'little'))
buf.extend(msxfcbfname.encode())
@@ -361,7 +289,7 @@
def prun(cmd = ''):
print(f"prun()")
- global hostType
+ hostType = detect_host()
if (cmd.strip() == '' or len(cmd.strip()) == 0):
rc, cmd = readParameters("Syntax: prun <command> <::> command. To pipe a command to other, use :: instead of |", True)
if rc != RC_SUCCESS:
@@ -374,13 +302,9 @@
cmd = cmd.replace("/", "\\")
p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
- if hostType == "Windows":
- buf = p.stdout.read().decode('cp932', errors='ignore') # Windowsではcp932でデコード
- err = p.stderr.read().decode('cp932', errors='ignore') # エラー出力も同様にデコード
- else:
- buf = p.stdout.read().decode('utf-8', errors='ignore') # Linux/Raspberry Piではutf-8でデコード
- err = p.stderr.read().decode('utf-8', errors='ignore')
- if len(err) > 0 and not ('0K ....' in err): # workaround for wget false positive
+ buf = p.stdout.read().decode()
+ err = (p.stderr.read().decode())
+ if len(err) > 0 and not ('0K ....' in err):
rc = RC_FAILED
buf = ("Pi:Error - " + str(err))
elif len(buf) == 0:
@@ -407,94 +331,18 @@
pathType, path = pathExpander(userPath, basepath)
try:
if pathType == 0:
- if hostType == "Windows":
- p = Popen(['cmd', '/c', 'dir', path], shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- raw_output = p.stdout.read()
- err_raw = p.stderr.read() # エラー出力も生のバイト列で取得
+ if detect_host() == "Windows":
+ prun('dir ' + path)
else:
- p = Popen(['ls', '-l', path], shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- raw_output = p.stdout.read()
- err_raw = p.stderr.read() # エラー出力も生のバイト列で取得
-
- # --- デバッグ情報追加ここから ---
- print(f"DEBUG: pdir - hostType: {hostType}")
- print(f"DEBUG: pdir - raw_output (bytes): {raw_output[:100]}...") # 先頭100バイトを表示
- print(f"DEBUG: pdir - err_raw (bytes): {err_raw[:100]}...") # 先頭100バイトを表示
- # --- デバッグ情報追加ここまで ---
-
- if hostType == "Windows":
- err = err_raw.decode('cp932', errors='ignore') # エラー出力もデコード
- else:
- err = err_raw.decode('utf-8', errors='ignore')
-
- if len(err) > 0:
- buf_to_send = ("Pi:Error - " + str(err)).encode()
- rc = RC_FAILED
- elif len(raw_output) == 0:
- buf_to_send = "Pi:Ok".encode()
- rc = RC_SUCCESS
- else:
- if hostType == "Windows":
- decoded_output = raw_output.decode('cp932', errors='ignore')
- else:
- decoded_output = raw_output.decode('utf-8', errors='ignore')
-
- decoded_output_lines = decoded_output.splitlines()
- converted_lines = []
- for line in decoded_output_lines:
- jis_encoded_line_parts = []
- for char in line:
- if ord(char) < 128:
- jis_encoded_line_parts.append(char.encode('ascii'))
- else:
- iso2022jp_bytes = char.encode('iso-2022-jp')
- if len(iso2022jp_bytes) == 4 and iso2022jp_bytes.startswith(b'\x1b$B') and iso2022jp_bytes.endswith(b'\x1b(B'):
- jis_encoded_line_parts.append(iso2022jp_bytes[2:4])
- else:
- jis_encoded_line_parts.append(b'?')
- converted_lines.append(b''.join(jis_encoded_line_parts))
-
- buf_to_send = b'\r\n'.join(converted_lines) + b'\r\n'
- rc = RC_SUCCESS
-
- sendmultiblock(buf_to_send, BLKSIZE, rc)
+ prun('ls -l ' + path)
else:
parser = MyHTMLParser()
htmldata = urlopen(path).read().decode()
- parser = MyHTMLParser()
parser.feed(htmldata)
buf = " ".join(parser.HTMLDATA)
- # ネットワークパスからのHTMLデータも日本語が含まれる可能性があるため、同様の変換が必要
- # ここもJIS変換を適用する
- jis_encoded_parts = []
- for char in buf:
- if ord(char) < 128:
- jis_encoded_parts.append(char.encode('ascii'))
- else:
- iso2022jp_bytes = char.encode('iso-2022-jp')
- if len(iso2022jp_bytes) == 4 and iso2022jp_bytes.startswith(b'\x1b$B') and iso2022jp_bytes.endswith(b'\x1b(B'):
- jis_encoded_parts.append(iso2022jp_bytes[2:4])
- else:
- jis_encoded_parts.append(b'?')
- buf_to_send_html = b''.join(jis_encoded_parts)
- sendmultiblock(buf_to_send_html, BLKSIZE, RC_SUCCESS)
-
+ rc = sendmultiblock(buf.encode(),BLKSIZE, RC_SUCCESS)
except Exception as e:
- error_message_str = 'Pi:Error - ' + str(e)
-
- # エラーメッセージもJIS変換して送信
- jis_encoded_error_parts = []
- for char in error_message_str:
- if ord(char) < 128:
- jis_encoded_error_parts.append(char.encode('ascii'))
- else:
- iso2022jp_bytes = char.encode('iso-2022-jp')
- if len(iso2022jp_bytes) == 4 and iso2022jp_bytes.startswith(b'\x1b$B') and iso2022jp_bytes.endswith(b'\x1b(B'):
- jis_encoded_error_parts.append(iso2022jp_bytes[2:4])
- else:
- jis_encoded_error_parts.append(b'?')
- buf_to_send_error = b''.join(jis_encoded_error_parts)
- sendmultiblock(buf_to_send_error, BLKSIZE, RC_SUCCESS)
+ sendmultiblock(('Pi:Error - ' + str(e)).encode(), BLKSIZE, RC_SUCCESS)
return RC_SUCCESS
@@ -539,11 +387,11 @@
def pcopy(msxcmd = "pcopy"):
print("pcopy()")
- global psetvar,GLOBALRETRIES,hostType
+ global psetvar,GLOBALRETRIES
+ hostType = detect_host()
basepath = getMSXPiVar('PATH')
rc = RC_SUCCESS
- # Receive parameters - but before, prepare help message to pass
errorMsg = 'Syntax:\n'
if msxcmd == "pcopy":
errorMsg = errorMsg + 'pcopy </z> remotefile <localfile>\n'
@@ -590,7 +438,6 @@
buf = f.read()
filesize = len(buf)
except Exception as e:
- err = 'Pi:Error - ' + str(e)
rc = sendmultiblock(('Pi:Error - ' + str(e)).encode(), BLKSIZE, RC_FAILED)
return RC_FAILED
else:
@@ -603,70 +450,66 @@
print(f"Pi:Error - {str(e)}")
rc = sendmultiblock(('Pi:Error - ' + str(e)).encode(), BLKSIZE, RC_FAILED)
return RC_FAILED
- # if /z passed, will uncompress the file
if rc == RC_SUCCESS:
if expand:
tmpfn0 = path.split('/')
tmpfn = tmpfn0[len(tmpfn0)-1]
+ tmppath = '/tmp/msxpi'
+ if not os.path.exists(tmppath):
+ os.makedirs(tmppath)
+
if hostType == "Windows":
- os.system('del /Q "C:\\tmp\\msxpi\\*"')
+ os.system(f'del /Q "{os.path.join(tmppath, "*")}"')
else:
- os.system('rm /tmp/msxpi/* 2>/dev/null')
- tmpfile = open('/tmp/' + tmpfn, 'wb')
+ os.system(f'rm {os.path.join(tmppath, "*")} 2>/dev/null')
+
+ tmpfile = open(os.path.join(tmppath, tmpfn), 'wb')
tmpfile.write(buf)
tmpfile.close()
- # If not windows, uses lha to extrac lzh files
+
if ".lzh" in tmpfn:
if hostType == "Windows":
- cmd = 'lha -xfiw=/tmp/msxpi /tmp/' + tmpfn
+ cmd = f'lha -xfiw={tmppath} {os.path.join(tmppath, tmpfn)}'
else:
- cmd = '/usr/bin/lhasa -xfiw=/tmp/msxpi /tmp/' + tmpfn
+ cmd = f'/usr/bin/lhasa -xfiw={tmppath} {os.path.join(tmppath, tmpfn)}'
p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
perror = (p.stderr.read().decode())
rc = p.poll()
if rc!=0 and rc != None:
rc = RC_FAILED
else:
- # Will use 7-Zip for any file type under Windows
if hostType == "Windows":
- cmd = '7z.exe e /tmp/' + tmpfn + ' -aoa -o/tmp/msxpi/'
+ cmd = f'7z.exe e {os.path.join(tmppath, tmpfn)} -aoa -o{tmppath}'
else:
- cmd = '/usr/bin/unar -f -o /tmp/msxpi /tmp/' + tmpfn
+ cmd = f'/usr/bin/unar -f -o {tmppath} {os.path.join(tmppath, tmpfn)}'
p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE, close_fds=True)
perror = (p.stderr.read().decode())
rc = p.poll()
if rc!=0:
rc = RC_FAILED
- romfiles = [f for f in os.listdir('/tmp/msxpi') if f.endswith(('.rom', '.ROM'))]
+
+ romfiles = [f for f in os.listdir(tmppath) if f.lower().endswith('.rom')]
if romfiles:
- fname1 = '/tmp/msxpi/' + romfiles[0]
-
+ fname1 = os.path.join(tmppath, romfiles[0])
try:
with open(fname1, mode='rb') as f:
buf = f.read()
-
filesize = len(buf)
rc = RC_SUCCESS
-
except Exception as e:
rc = sendmultiblock(('Pi:Error - ' + str(e)).encode(), BLKSIZE, RC_FAILED)
return RC_FAILED
-
else:
print(f"Pi:Error - {perror}")
rc = sendmultiblock(('Pi:Error - ' + perror).encode(), BLKSIZE, RC_FAILED)
return RC_FAILED
- # If all good so far (including eventual decompress if needed)
- # then send the file to MSX
if rc == RC_SUCCESS:
if filesize == 0:
rc = sendmultiblock("Pi:Error - File size is zero bytes".encode(), BLKSIZE, RC_FAILED)
return RC_FAILED
-
- else:
- # Did we boot from the MSXPi ROM or another external drive?
- if (not msxdos1boot) or msxcmd == "ploadr": # Boot was from an externdal drive OR it is PLOADR
+ else:
+ if (not msxdos1boot) or msxcmd == "ploadr":
if expand:
if fname2 == '':
rc = ini_fcb(expandedFn,filesize)
@@ -677,33 +520,29 @@
if rc != RC_SUCCESS:
print("pcopy: ini_fcb failed")
return rc
-
- # This will send the file to MSX, for pcopy to write it to disk
rc = sendmultiblock(buf,SECTORSIZE, rc)
-
- else:# Booted from MSXPi disk drive (disk images)
- # this routine will write the file directly to the disk image in RPi
+ else:
try:
drive = getMSXPiVar('DriveA')
- fatfsfname = "fat:///"+getMSXPiVar('DriveA') # Asumme Drive A:
+ fatfsfname = "fat:///"+getMSXPiVar('DriveA')
if fname2.upper().startswith("A:"):
fname2 = fname2.split(":")
if len(fname2[1]) > 0:
- fname2=fname2[1] # Remove "A:" from name
+ fname2=fname2[1]
elif expandedFn != '':
fname2 = expandedFn
else:
- fname2=path.split("/")[len(path.split("/"))-1] # Drive not passed in name
+ fname2=path.split("/")[len(path.split("/"))-1]
elif fname2.upper().startswith("B:"):
drive = getMSXPiVar('DriveB')
- fatfsfname = "fat:///"+getMSXPiVar('DriveB') # Is Drive B:
+ fatfsfname = "fat:///"+getMSXPiVar('DriveB')
fname2 = fname2.split(":")
if len(fname2[1]) > 0:
- fname2=fname2[1] # Remove "B:" from name
+ fname2=fname2[1]
elif expandedFn != '':
fname2 = expandedFn
else:
- fname2=path.split("/")[len(path.split("/"))-1] # Drive not passed in name
+ fname2=path.split("/")[len(path.split("/"))-1]
elif expandedFn != '':
fname2 = expandedFn
elif fname2 == '':
@@ -716,20 +555,10 @@
sendmultiblock("Pi:Ok".encode(), BLKSIZE, RC_TERMINATE)
except Exception as e:
rc = sendmultiblock(('Pi:Error - ' + str(e)).encode(), BLKSIZE, RC_FAILED)
-
return rc
-def formatrsp(rc,lsb,msb,msg,size=BLKSIZE):
- b = bytearray(size)
- b[0] = rc
- b[1] = lsb
- b[2] = msb
- b[3:len(msg)] = bytearray(msg.encode())
- return b
-
def pdate():
print("pdate()")
-
pdate = bytearray(8)
now = datetime.datetime.now()
pdate[0]=(now.year & 0xff)
@@ -746,41 +575,16 @@
def pplay():
print("pplay()")
rc, data = readParameters("Syntax:\npplay play|loop|pause|resume|stop|getids|getlids|list <filename|processid|directory|playlist|radio>\nExemple: pplay play music.mp3", True)
-
if rc != RC_SUCCESS:
return RC_SUCCESS
-
- if hostType != "RaspberryPi":
- sendmultiblock("Command not supported in this platform".encode(), BLKSIZE, RC_SUCCESS)
- return RC_SUCCESS
-
- parmslist = data.split(" ")
- cmd = parmslist[0]
- if len(parmslist) > 1:
- parms = data.split(" ")[1].split("\x00")[0]
- else:
- parms = ''
-
- buf = ''
- try:
- buf = subprocess.check_output(['/home/pi/msxpi/pplay.sh',getMSXPiVar('PATH'),cmd,parms])
- if buf == b'':
- buf = b'\x0a'
- sendmultiblock(buf, BLKSIZE, RC_SUCCESS)
- except subprocess.CalledProcessError as e:
- sendmultiblock(("Pi:Error - "+str(e)).encode(),BLKSIZE, RC_FAILED)
-
+ sendmultiblock("Command not supported in this platform".encode(), BLKSIZE, RC_SUCCESS)
return RC_SUCCESS
def pvol():
print("pvol()")
rc, data = readParameters("This command requires a parameter", True)
if rc == RC_SUCCESS:
- if hostType == "RaspberryPi":
- rc = prun("mixer set PCM -- " + data)
- return RC_SUCCESS
- else:
- sendmultiblock("Command not supported in this platform".encode(), BLKSIZE, RC_SUCCESS)
+ sendmultiblock("Command not supported in this platform".encode(), BLKSIZE, RC_SUCCESS)
return RC_SUCCESS
def pset(varn = '', varv = ''):
@@ -812,10 +616,10 @@
if rc == RC_SUCCESS:
if varname.upper() == 'DRIVEA':
rc,drive0Data = msxdos_inihrd(varvalue)
- updateIniFile(MSXPIHOME+'/msxpi.ini',psetvar)
+ updateIniFile(os.path.join(MSXPIHOME, 'msxpi.ini'),psetvar)
elif varname.upper() == 'DRIVEB':
rc,drive1Data = msxdos_inihrd(varvalue)
- updateIniFile(MSXPIHOME+'/msxpi.ini',psetvar)
+ updateIniFile(os.path.join(MSXPIHOME, 'msxpi.ini'),psetvar)
if varn == '':
rc = sendmultiblock("Pi:Ok".encode(), BLKSIZE, RC_SUCCESS)
@@ -830,24 +634,22 @@
index = 0
for index in range(0,len(psetvar)):
if (psetvar[index][0].upper() == pvar.upper()):
- if pvalue == '': #will erase / clean a variable
+ if pvalue == '':
psetvar[index][0] = 'free'
psetvar[index][1] = 'free'
- updateIniFile(MSXPIHOME+'/msxpi.ini',psetvar)
+ updateIniFile(os.path.join(MSXPIHOME, 'msxpi.ini'),psetvar)
return RC_SUCCESS
else:
psetvar[index][1] = pvalue
- updateIniFile(MSXPIHOME+'/msxpi.ini',psetvar)
+ updateIniFile(os.path.join(MSXPIHOME, 'msxpi.ini'),psetvar)
return RC_SUCCESS
index += 1
- # Did not find the Var - User is tryign to add a new one
- # Check if there is a slot, then add new variable
for index in range(0,len(psetvar)):
if (psetvar[index][0] == "free" and psetvar[index][1] == "free"):
psetvar[index][0] = pvar
psetvar[index][1] = pvalue
- updateIniFile(MSXPIHOME+'/msxpi.ini',psetvar)
+ updateIniFile(os.path.join(MSXPIHOME, 'msxpi.ini'),psetvar)
return RC_SUCCESS
return RC_FAILED
@@ -865,145 +667,23 @@
def pwifi():
print("pwifi()")
-
- global psetvar
- wifissid = getMSXPiVar('WIFISSID')
- wifipass = getMSXPiVar('WIFIPWD')
- wificountry = getMSXPiVar('WIFICOUNTRY')
-
rc,cmd = readParameters("", False)
if rc != RC_SUCCESS:
return RC_FAILED
-
- if (cmd[:2] == "/h"):
- sendmultiblock("Pi:Usage:\npwifi display | set".encode(), BLKSIZE, RC_FAILED)
- return RC_SUCCESS
-
- if (cmd[:1] == "s" or cmd[:1] == "S"):
- if hostType == "RaspberryPi":
- wifisetcmd = 'sudo nmcli device wifi connect "' + wifissid + '" password "' + wifipasss + '"'
- prun(wifisetcmd)
- else:
- sendmultiblock(b'Parameter not supported in this platform', BLKSIZE, RC_SUCCESS)
- else:
- if hostType == "RaspberryPi":
- prun("ip a | grep '^1\\|^2\\|^3\\|^4\\|inet'|grep -v inet6")
- else:
- prun("ipconfig")
-
+ prun("ipconfig" if detect_host() == "Windows" else "ip a")
return RC_SUCCESS
def pver():
print("pver()")
- global version,build
+ global version,BuildId
ver = "MSXPi Server Version "+version+" Build "+ BuildId
rc = sendmultiblock(ver.encode(), BLKSIZE, RC_SUCCESS)
return rc
def irc():
-
print("irc()")
-
- global allchann,psetvar,channel,ircsock
- ircserver = getMSXPiVar('IRCADDR')
- ircport = int(getMSXPiVar('IRCPORT'))
- msxpinick = getMSXPiVar('IRCNICK')
-
- rc,data = recvdata()
- if rc != RC_SUCCESS:
- return rc
- if data[0] == 0:
- cmd=''
- else:
- cmd = data.decode().split("\x00")[0].lower()
- rc = RC_SUCCNOSTD
- try:
- if cmd[:4] == 'conn':
- ircsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- jparm = cmd.split(' ')
- jnick = jparm[1]
- if (jnick == 'none'):
- jnick = msxpinick
- ircsock.connect((ircserver, ircport))
- buf = bytearray()
- buf.extend(("USER "+ jnick +" "+ jnick +" "+ jnick + " " + jnick + "\r\n").encode())
- ircsock.send(buf)
- buf = bytearray()
- buf.extend(("NICK "+ jnick +"\r\n").encode())
- ircsock.setblocking(0);
- ircsock.send(buf)
- ircmsg = 'Connected to ' + ircserver
- sendmultiblock(ircmsg.encode(), BLKSIZE, RC_SUCCESS)
- elif cmd[:3] == "msg":
- ircsock.setblocking(0);
- ircsock.send(("PRIVMSG "+cmd[4:] +"\r\n").encode())
- sendmultiblock("Pi:Ok\n".encode(), BLKSIZE, RC_SUCCNOSTD)
- elif cmd[:4] == 'join':
- jparm = cmd.split(' ')
- jchannel = jparm[1]
- if jchannel in allchann:
- ircmsg = 'Already joined - setting to current. List of channels:' + str(allchann).replace('bytearray(b','').replace(')','')
- channel = jchannel
- ircsock.setblocking(0);
- ircsock.send(("JOIN " + jchannel + "\r\n").encode())
- ircmsg = 'Pi:Ok\n'
- rc = RC_SUCCNOSTD
- ircsock.setblocking(0);
- sendmultiblock(ircmsg.encode(), BLKSIZE, rc)
- elif cmd[:4] == 'read':
- ircmsg = 'Pi:Error'
- try:
- ircmsg = ircsock.recv(2048).decode()
- if len(ircmsg)>1:
- ircmsg = ircmsg.strip('\n\r')
- if ircmsg.find("PING :") != -1:
- ircmsgList = ircmsg.split(":")
- idx=0
- pingReply = 'PONG'
- for msg in ircmsgList:
- if 'PING' in msg:
- pingReply = ircmsgList[idx + 1]
- idx += 1
- ircsock.setblocking(0);
- ircsock.send(("PONG :"+pingReply+"\r\n").encode())
- rc = RC_SUCCNOSTD
- if ircmsg.find("PRIVMSG") != -1:
- ircname = ircmsg.split('!',1)[0][1:]
- ircchidxs = ircmsg.find('PRIVMSG')+8
- ircchidxe = ircmsg[ircchidxs:].find(':')
- ircchann = ircmsg[ircchidxs:ircchidxs+ircchidxe-1]
- if msxpinick in ircchann:
- ircchann = 'private'
- ircremmsg = ircmsg[ircchidxs+ircchidxe+1:]
- ircmsg = '<' + ircchann + '> ' + ircname + ' -> ' + ircremmsg
- rc = RC_SUCCESS
- except socket.error as e:
- err = e.args[0]
- print("irc read exception:",err,str(e))
- ircmsg = 'Pi:Ok\n'
- rc = RC_SUCCNOSTD
- sendmultiblock(ircmsg.encode(), BLKSIZE, rc)
- elif cmd[:5] == 'names':
- ircsock.send((cmd+"\r\n").encode())
- ircmsg = ''
- ircmsg = ircmsg + ircsock.recv(2048).decode("UTF-8")
- ircmsg = ircmsg.strip('\n\r')
- ircmsg = "Users on channel " #+ ircmsg.split('=',1)[1]
- sendmultiblock(ircmsg.encode(), BLKSIZE, RC_SUCCESS)
- elif cmd[:4] == 'quit':
- ircsock.send(("/quit\r\n").encode())
- ircsock.close()
- sendmultiblock("Pi:leaving room\r\n".encode(),BLKSIZE, RC_SUCCESS)
- elif cmd[:4] == 'part':
- ircsock.send(("/part\r\n").encode())
- ircsock.close()
- sendmultiblock("Pi:leaving room\n".encode(),BLKSIZE, RC_SUCCESS)
- else:
- print("irc:no valid command received")
- sendmultiblock("Pi:No valid command received".encode(),BLKSIZE, rc)
- except Exception as e:
- print("irc:Caught exception"+str(e))
- sendmultiblock("Pi:"+str(e).encode(), BLKSIZE, rc)
+ sendmultiblock("Command not supported in this platform".encode(), BLKSIZE, RC_SUCCESS)
+ return RC_SUCCESS
def py():
print('python()')
@@ -1017,7 +697,7 @@
buf = f.getvalue()
sendmultiblock(buf.encode(), BLKSIZE, RC_SUCCESS)
except Exception as e:
- print("python:",str(e).encode())
+ print("python:"+str(e).encode())
sendmultiblock(("Pi:Error - "+str(e)).encode(), BLKSIZE, RC_FAILED)
else:
sendmultiblock('Pi:Error'.encode(), BLKSIZE, rc)
@@ -1025,7 +705,6 @@
def dosinit():
print("dosinit()")
global msxdos1boot
-
rc,data = recvdata(BLKSIZE)
if rc == RC_SUCCESS:
flag = data.decode().split("\x00")[0]
@@ -1033,79 +712,48 @@
dskioini()
else:
msxdos1boot = False
-
return rc
def dskioini():
print("dskioini()")
-
global msxdos1boot,sectorInfo,drive0Data,drive1Data
-
- # Initialize disk system parameters
msxdos1boot = True
sectorInfo = [0,0,0,0]
- # Load the disk images into a memory mapped variable
rc , drive0Data = msxdos_inihrd(getMSXPiVar('DriveA'))
rc , drive1Data = msxdos_inihrd(getMSXPiVar('DriveB'))
def dskiords():
print("dskiords()")
-
DOSSCTSZ = SECTORSIZE - 3
-
global msxdos1boot,sectorInfo,drive0Data,drive1Data
if not msxdos1boot:
dskioini()
-
initdataindex = sectorInfo[3]*DOSSCTSZ
numsectors = sectorInfo[1]
sectorcnt = 0
-
- #print("dskiords:deviceNumber=",sectorInfo[0])
- #print("dskiords:numsectors=",sectorInfo[1])
- #print("dskiords:mediaDescriptor=",sectorInfo[2])
- #print("dskiords:initialSector=",sectorInfo[3])
- #print("dskiords:blocksize=",DOSSCTSZ)
-
while sectorcnt < numsectors:
- #print("dskiords:",sectorcnt)
if sectorInfo[0] == 0:
- buf = drive0Data[initdataindex+(sectorcnt*DOSSCTSZ):initdataindex+DOSSCTSZ+(sectorcnt*DOSSCTSZ)]
+ buf = drive0Data[initdataindex+(sectorcnt*DOSSCTSZ):initdataince + DOSSCTSZ + (sectorcnt*DOSSCTSZ)]
else:
buf = drive1Data[initdataindex+(sectorcnt*DOSSCTSZ):initdataindex+DOSSCTSZ+(sectorcnt*DOSSCTSZ)]
rc = senddata(buf,DOSSCTSZ)
sectorcnt += 1
-
- if rc == RC_SUCCESS:
- pass
- #print("dskiords: checksum is a match")
- else:
+ if rc != RC_SUCCESS:
print("dskiords: checksum error")
break
def dskiowrs():
print("dskiowrs()")
-
DOSSCTSZ = SECTORSIZE - 3
-
global msxdos1boot,sectorInfo,drive0Data,drive1Data
if not msxdos1boot:
dskioini()
-
initdataindex = sectorInfo[3]*DOSSCTSZ
numsectors = sectorInfo[1]
sectorcnt = 0
-
- #print("dskiowrs:deviceNumber=",sectorInfo[0])
- #print("dskiowrs:numsectors=",sectorInfo[1])
- #print("dskiowrs:mediaDescriptor=",sectorInfo[2])
- #print("dskiowrs:initialSector=",sectorInfo[3])
- #print("dskiowrs:blocksize=",DOSSCTSZ)
-
while sectorcnt < numsectors:
rc,buf = recvdata(DOSSCTSZ)
if rc == RC_SUCCESS:
- #print("dskiowrs: checksum is a match")
if sectorInfo[0] == 0:
drive0Data[initdataindex+(sectorcnt*DOSSCTSZ):initdataindex+DOSSCTSZ+(sectorcnt*DOSSCTSZ)] = buf
else:
@@ -1117,84 +765,30 @@
def dskiosct():
print("dskiosct()")
-
- DOSSCTSZ = SECTORSIZE - 3
-
- global msxdos1boot,sectorInfo,drive0Data,drive1Data
+ global msxdos1boot,sectorInfo
if not msxdos1boot:
dskioini()
-
- route = 1
-
- if route == 1:
- rc,buf = recvdata(5)
+ rc,buf = recvdata(5)
+ if rc == RC_SUCCESS:
sectorInfo[0] = buf[0]
sectorInfo[1] = buf[1]
sectorInfo[2] = buf[2]
byte_lsb = buf[3]
byte_msb = buf[4]
sectorInfo[3] = byte_lsb + 256 * byte_msb
- if rc == RC_SUCCESS:
- pass
- # print("dskiosct: checksum is a match")
- else:
- print("dskiosct: checksum error")
-
- else:
- # Syncronize with MSX
+ else:
+ print("dskiosct: checksum error")
+
+def recvdata(bytecounter = BLKSIZE):
+ retries = GLOBALRETRIES
+ while retries > 0:
+ retries -= 1
while True:
rc, pibyte = piexchangebyte()
if rc == RC_FAILED or pibyte == READY:
break
-
- if rc == RC_FAILED:
- return
-
- rc, sectorInfo[0] = piexchangebyte()
- rc, sectorInfo[1] = piexchangebyte()
- rc, sectorInfo[2] = piexchangebyte()
- rc, byte_lsb = piexchangebyte()
- rc, byte_msb = piexchangebyte()
- sectorInfo[3] = byte_lsb + 256 * byte_msb
- rc, msxcrc = piexchangebyte()
-
- crc = 0xFF
- crc = crc ^ (sectorInfo[0])
- crc = crc ^ (sectorInfo[1])
- crc = crc ^ (sectorInfo[2])
- crc = crc ^ (byte_lsb)
- crc = crc ^ (byte_msb)
- piexchangebyte(crc)
-
- if crc != msxcrc:
- print("dos_sct: crc error")
-
- #print("dskiosct:deviceNumber=",sectorInfo[0])
- #print("dskiosct:numsectors=",sectorInfo[1])
- #print("dskiosct:mediaDescriptor=",sectorInfo[2])
- #print("dskiosct:initialSector=",sectorInfo[3])
-
-def recvdata(bytecounter = BLKSIZE):
-
- global hostType
- print(f"recvdata()")
-
- if hostType == "RaspberryPi":
- th = threading.Timer(3.0, exitDueToSyncError)
-
- retries = GLOBALRETRIES
- while retries > 0:
- retries -= 1
-
- # Syncronize with MSX
- while True:
- rc, pibyte = piexchangebyte()
- if rc == RC_FAILED or pibyte == READY:
- break
-
if rc == RC_FAILED:
return RC_FAILED, None
-
data = bytearray()
chksum = 0
while(bytecounter > 0 ):
@@ -1204,317 +798,162 @@
data.append(msxbyte)
chksum += msxbyte
bytecounter -= 1
-
- # Receive the CRC
rc, msxsum = piexchangebyte()
-
- # Send local CRC - only 8 right bits
- thissum_r = (chksum % 256) # right 8 bits
- thissum_l = (chksum >> 8) # left 8 bits
+ thissum_r = (chksum % 256)
+ thissum_l = (chksum >> 8)
thissum = ((thissum_l + thissum_r) % 256)
piexchangebyte(thissum)
-
if (thissum == msxsum):
- rc = RC_SUCCESS
- #print("recvdata: checksum is a match")
- if hostType == "RaspberryPi":
- th.cancel()
- break
- else:
- rc = RC_TXERROR
+ return RC_SUCCESS,data
+ else:
print("recvdata: checksum error")
- if hostType == "RaspberryPi":
- th.start()
-
- print("exiting recvdata")
- return rc,data
+ return RC_TXERROR, None
def senddata(data, blocksize = BLKSIZE):
-
- global hostType
- print(f"senddata()")
-
- if hostType == "RaspberryPi":
- th = threading.Timer(3.0, exitDueToSyncError)
- th.start()
-
- rc = RC_SUCCESS
retries = GLOBALRETRIES
while retries > 0:
retries -= 1
- # Syncronize with MSX
while True:
rc, pibyte = piexchangebyte()
if rc == RC_FAILED or pibyte == READY:
break
-
if rc == RC_FAILED:
return RC_FAILED
-
- #print("senddata(): Sync acquired")
byteidx = 0
chksum = 0
-
while(byteidx < blocksize):
byte0 = data[byteidx]
- if type(byte0) is int:
- byte = byte0
- else:
- byte = ord(byte0)
-
+ byte = byte0 if isinstance(byte0, int) else ord(byte0)
chksum += byte
piexchangebyte(byte)
byteidx += 1
-
- #print("senddata(): calculating checksum")
- # Send local CRC - only 8 right bits
- thissum_r = (chksum % 256) # right 8 bits
- thissum_l = (chksum >> 8) # left 8 bits
+ thissum_r = (chksum % 256)
+ thissum_l = (chksum >> 8)
thissum = ((thissum_l + thissum_r) % 256)
piexchangebyte(thissum)
-
- # Receive the CRC
rc, msxsum = piexchangebyte()
-
if (thissum == msxsum):
- rc = RC_SUCCESS
- #print("senddata: checksum is a match")
- if hostType == "RaspberryPi":
- th.cancel()
- break
- else:
- rc = RC_TXERROR
+ return RC_SUCCESS
+ else:
print("senddata: checksum error")
-
- return rc
+ return RC_TXERROR
def sendmultiblock(buf, blocksize = BLKSIZE, rc = RC_SUCCESS):
-
- global hostType
-
- #print(f"sendmultiblock(): {buf}")
-
numblocks = math.ceil((len(buf)+3)/blocksize)
-
- # If buffer small or equal to BLKSIZE
- if numblocks == 1: # Only one block to transfer
- #print(f"1 block rc = {hex(rc)} , buf size = {len(buf)} blocksize = {blocksize}")
- #print(f"buf = {buf}")
+ if numblocks == 1:
data = bytearray(blocksize)
data[0] = rc
data[1] = int(len(buf) % 256)
data[2] = int(len(buf) >> 8)
- data[3:len(buf)] = buf
- rc = senddata(data[:blocksize],blocksize)
- else: # Multiple blocks to transfer
+ data[3:3+len(buf)] = buf
+ rc = senddata(data,blocksize)
+ else:
idx = 0
thisblk = 0
- print(f"sendmultiblock(): Blocks to send = {numblocks}")
while thisblk < numblocks:
- #print(f"sendmultiblock(): block {thisblk}")
data = bytearray(blocksize)
if thisblk + 1 == numblocks:
- data[0] = rc # Last block - send original RC
+ data[0] = rc
datasize = len(buf) - idx
- data[1] = datasize % 256
- data[2] = datasize >> 8
else:
- data[0] = RC_READY # This is not last block
+ data[0] = RC_READY
datasize = blocksize - 3
- data[1] = datasize % 256
- data[2] = datasize >> 8
- data[3:datasize] = buf[idx:idx + datasize]
+ data[1] = datasize % 256
+ data[2] = datasize >> 8
+ data[3:3+datasize] = buf[idx:idx + datasize]
- # monitor disconnections on non-Raspberry Pi platforms
-
- if hostType == "RaspberryPi":
- rc = senddata(data,blocksize)
- if rc == RC_FAILED:
+ try:
+ rc_send = senddata(data,blocksize)
+ if rc_send == RC_FAILED:
return RC_FAILED
- idx += (blocksize - 3)
- thisblk += 1
- else:
- conn.settimeout(5.0) # Set timeout before sending
- try:
- rc = senddata(data,blocksize)
- if rc == RC_FAILED:
- return RC_FAILED
- idx += (blocksize - 3)
- thisblk += 1
- conn.settimeout(None) # Optional: restore to blocking mode
- except socket.timeout:
- print("Send timeout: peer not responding.")
- break
- conn.settimeout(None) # Optional: restore to blocking mode
-
+ idx += datasize
+ thisblk += 1
+ except socket.timeout:
+ print("Send timeout: peer not responding.")
+ break
return rc
-# This is the function that read parameters for all commands
-# The parameters have following use:
-# errorMsg: this is the error message to return to MSX if there
-# was an error reading the parameter
-# needParm: This flag indicates if a parameters must have been
-# passed or if parameters are optional.
-# Some commands (such as chatgpt.com) must have a parameter,
-# therefore needParam must be True
def readParameters(errorMsg, needParm=False):
- #print("readparms():")
rc, data = recvdata(BLKSIZE)
-
if rc != RC_SUCCESS:
- print(f"Pi:Error reading parameters")
encodederrorMsg = ('Pi:Error reading parameters').encode()
sendmultiblock(encodederrorMsg, BLKSIZE, RC_FAILED)
return RC_FAILED, None
-
- parms = data.decode().split("\x00")[0].strip()
+ parms = data.decode(errors='ignore').split("\x00")[0].strip()
if needParm and not parms:
- print(f"Pi:Error - {errorMsg}")
encodederrorMsg = ('Pi:Error - ' + errorMsg).encode()
sendmultiblock(encodederrorMsg, BLKSIZE, RC_FAILED)
return RC_FAILED, None
-
- #print(f"Parameters:{parms}")
return RC_SUCCESS, parms
def prestart():
- print("prestart()")
- if hostType == "RaspberryPi":
- print("Restarting MSXPi Server")
- exitDueToSyncError()
- else:
- print("Command not supported in this platform")
-
+ print("Restarting MSXPi Server (Socket Mode)")
+ os.execv(sys.executable, ['python'] + sys.argv)
+
def preboot():
- print("preboot()")
- if hostType == "RaspberryPi":
- print("Rebooting Raspberry Pi")
- os.system("sudo reboot")
- else:
- print("Command not supported in this platform")
-
+ print("Reboot command not supported in socket mode")
+ sendmultiblock("Reboot command not supported in socket mode".encode(), BLKSIZE, RC_SUCCESS)
+
def pshut():
- print("pshut()")
- if hostType == "RaspberryPi":
- print("Shutting down Raspberry Pi")
- os.system("sudo shutdown -h now")
- else:
- print("Command not supported in this platform")
-
-def button_handler(channel):
- start = time.time()
- # Wait for release
- while GPIO.input(RPI_SHUTDOWN) == GPIO.LOW:
- time.sleep(0.01)
- duration = time.time() - start
-
- if duration >= 3:
- print("Shutdown triggered")
- os.system("sudo shutdown -h now")
- else:
- print("Reboot triggered")
- os.system("sudo reboot")
-
-
+ print("Shutdown command not supported in socket mode")
+ sendmultiblock("Shutdown command not supported in socket mode".encode(), BLKSIZE, RC_SUCCESS)
+
def exitDueToSyncError():
print("Sync error. Recycling MSXPi-Server")
- GPIO.cleanup() # cleanup all GPIO
- os.system("/home/pi/msxpi/kill.sh")
+ os.execv(sys.executable, ['python'] + sys.argv)
def updateIniFile(fname,memvar):
- f = open(fname, 'w')
- for v in memvar:
- f.writelines('var '+v[0]+'='+v[1]+'\n')
- f.close()
+ try:
+ with open(fname, 'w') as f:
+ for v in memvar:
+ f.writelines('var '+v[0]+'='+v[1]+'\n')
+ except IOError as e:
+ print(f"Error writing to ini file {fname}: {e}")
def apitest():
print("apitest")
-
- # This command requires parameter
rc, data = readParameters("This command requires a parameter", False)
-
- # Stops if MSX did not send the query for OpenAI
- if rc == RC_FAILED:
- return RC_FAILED
-
- # Send response to CALL MSXPI - It will always expect a response
+ if rc == RC_FAILED: return RC_FAILED
+ buf1 = data.decode(errors='ignore').split("\x00")[0]
rc = sendmultiblock(('Pi:CALL MSXPI parameters:' + buf1).encode(), BLKSIZE, RC_SUCCESS)
-
- # Now Receive additional data sent with CALL MSXPISEND
rc,data = recvdata(BLKSIZE)
- #print("Additional data sent by CALL MSXPISEND:",data)
-
- #print("Extracting only ascii bytes and setting reponse...")
- buf2 = data.decode().split("\x00")[0]
-
- #print("Sending response: ",buf2)
+ buf2 = data.decode(errors='ignore').split("\x00")[0]
rc = sendmultiblock(('Pi:CALL MSXPISEND data:' + buf2).encode(), BLKSIZE, RC_SUCCESS)
def chatgpt():
print("chatgpt()")
api_key = getMSXPiVar('OPENAIKEY')
if not api_key or api_key == "Your OpenAI API Key":
- print('Pi:Error - OPENAIKEY is not defined. Define your key with PSET or add to msxpi.ini')
- sendmultiblock(b'Pi:Error - OPENAIKEY is not defined. Define your key with PSET or add to msxpi.ini', BLKSIZE, RC_FAILED)
+ sendmultiblock(b'Pi:Error - OPENAIKEY is not defined.', BLKSIZE, RC_FAILED)
return RC_FAILED
-
- # This command requires parameter
- rc, query = readParameters("This command requires a query", False)
-
- # Stops if MSX did not send the query for OpenAI
- if rc == RC_FAILED:
- return RC_FAILED
-
- model_engine = "gpt-3.5-turbo"
+ rc, query = readParameters("This command requires a query", True)
+ if rc == RC_FAILED: return RC_FAILED
+
url = "https://api.openai.com/v1/chat/completions"
-
try:
- headers = {
- "Authorization": f"Bearer {api_key}",
- "Content-Type": "application/json"
- }
-
- payload = {
- "model": model_engine,
- "messages": [
- {"role": "user", "content": query}
- ]
- }
-
+ headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
+ payload = {"model": "gpt-3.5-turbo", "messages": [{"role": "user", "content": query}]}
response = requests.post(url, headers=headers, json=payload)
openai_response = response.json()
if "choices" in openai_response:
response_text = openai_response["choices"][0]["message"]["content"]
sendmultiblock(response_text.encode(), BLKSIZE, RC_SUCCESS)
else:
- sendmultiblock(openai_response.encode(), BLKSIZE, RC_FAILED)
+ sendmultiblock(str(openai_response).encode(), BLKSIZE, RC_FAILED)
except Exception as e:
- error_msg = f"Pi:Error - {str(e)}"
- print(error_msg)
- sendmultiblock(error_msg.encode(), BLKSIZE, RC_FAILED)
+ sendmultiblock(f"Pi:Error - {str(e)}".encode(), BLKSIZE, RC_FAILED)
def initialize_connection():
- if hostType == "RaspberryPi":
- init_spi_bitbang()
- # Blink LED to show that Raspberry PI is now On-Line
- GPIO.output(RPI_READY, GPIO.HIGH)
- time.sleep(0.2) # 0.2 seconds = 200 milliseconds
- GPIO.output(RPI_READY, GPIO.LOW)
-
- # Add falling edge detection on GPIO 26 - Shutdown request via MSXPi push button
- GPIO.add_event_detect(RPI_SHUTDOWN, GPIO.FALLING, callback=button_handler, bouncetime=200)
- print(f"[MSXPi Server on {hostType}] Listening on GPIOs:\n ** CS={SPI_CS}, CLK={SPI_SCLK}, MOSI={SPI_MOSI}, MISO={SPI_MISO}, PI_READY={RPI_READY} **\n")
- return None
- else:
- """Set up the server socket and wait for a client connection."""
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.bind((HOST, PORT))
- s.listen(1)
- print(f"[MSXPi Server on {hostType}] Listening on {HOST}:{PORT}...")
- conn, addr = s.accept()
- print(f" ** MSX Connected to {addr} **\n")
- return conn
+ """Set up the server socket and wait for a client connection."""
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind((HOST, PORT))
+ s.listen(1)
+ hostType = detect_host()
+ print(f"[MSXPi Server on {hostType}] Listening on {HOST}:{PORT}...")
+ new_conn, addr = s.accept()
+ print(f" ** MSX Connected from {addr} **\n")
+ return new_conn
def ShowSecurityDisclaimer():
print("\n=====================================================================================")
@@ -1522,89 +961,54 @@
print("It allows the MSX to:\n")
print(" * List/read (any) file from this computer or network (via the the PDIR/PCOPY commands).\n")
print(" * Execute arbitrary(!) shell commands (via the PRUN command).\n")
- if hostType == "RaspberryPi":
- print(" * Configure the WiFi settings (via the PSET/PWIFI commands).\n")
- print("Some very few commands designed specifically for Raspberry Pi requires elevation of")
- print("privileges using sudo - these commands will not be executed in the PC platforms and")
- print("when possible, a message will be returned to the MSX informing that the command is")
- print("not supported.")
- print("However notice that using PRUN, the MSX user can execute any commands in the host,")
- print("bypassing the controls in the native MSXPi commands.")
+ print("This version runs in SOCKET-ONLY mode. GPIO-specific commands are disabled.")
print("=======================================================================================\n")
""" ============================================================================
MSXPi Server (msxpi-server.py) main program starts here
============================================================================
"""
-
-# This section reads the persistent user configuration from msxpi.ini configuration file.
-# When msxpi.ini does not exist, it populates the memory variables with default values.
-if exists(MSXPIHOME+'/msxpi.ini'):
- f = open(MSXPIHOME+'/msxpi.ini','r')
- idx = 0
+if not os.path.exists(MSXPIHOME):
+ os.makedirs(MSXPIHOME)
+if not os.path.exists(RAMDISK):
+ os.makedirs(RAMDISK)
+
+ini_file_path = os.path.join(MSXPIHOME, 'msxpi.ini')
+
+if exists(ini_file_path):
psetvar = []
- while True:
- line = f.readline()
- if not line:
- break
-
- if line.startswith('var'):
- var = line.split(' ')[1].split('=')[0].strip()
- value = line.replace('var ','',1).replace(var,'',1).split('=')[1].strip()
- psetvar.append([var,value])
- idx += 1
- f.close()
- if 'SPI_CS' not in str(psetvar):
- psetvar.append(["SPI_HW","False"])
- psetvar.append(["SPI_CS","21"])
- psetvar.append(["SPI_SCLK","20"])
- psetvar.append(["SPI_MOSI","16"])
- psetvar.append(["SPI_MISO","12"])
- psetvar.append(["RPI_READY","25"])
- if 'free' not in str(psetvar):
- psetvar.append(["free","free"])
-
+ with open(ini_file_path,'r') as f:
+ for line in f:
+ if line.startswith('var'):
+ parts = line.strip().split('=', 1)
+ var_part = parts[0].split(' ', 1)[1]
+ value = parts[1]
+ psetvar.append([var_part, value])
else:
- psetvar = [['PATH','/home/pi/msxpi'], \
- ['DriveA','/home/pi/msxpi/disks/msxpiboot.dsk'], \
- ['DriveB','/home/pi/msxpi/disks/tools.dsk'], \
- ['DriveM','https://github.com/costarc/MSXPi/raw/master/software/target'], \
- ['DriveR1','https://www.msxarchive.nl/pub/msx/games/roms/msx1'], \
- ['DriveR2','https://www.msxarchive.nl/pub/msx/games/roms/msx2'], \
- ['WIDTH','80'], \
- ['WIFISSID','MYWIFI'], \
- ['WIFIPWD','MYWFIPASSWORD'], \
- ['WIFICOUNTRY','GB'], \
- ['DSKTMPL','/home/pi/msxpi/disks/blank.dsk'], \
- ['IRCNICK','msxpi'], \
- ['IRCADDR','chat.freenode.net'], \
- ['IRCPORT','6667'], \
- ['SPI_HW','False'], \
- ['SPI_CS','21'], \
- ['SPI_SCLK','20'], \
- ['SPI_MOSI','16'], \
- ['SPI_MISO','12'], \
- ['RPI_READY','25'], \
- ['OPENAIKEY','']]
-
-print(f"\n** Starting MSXPi Server Version {version} Build {BuildId} **\n")
-
-# Initialize the server
+ psetvar = [
+ ['PATH', MSXPIHOME],
+ ['DriveA', os.path.join(MSXPIHOME, 'disks/msxpiboot.dsk')],
+ ['DriveB', os.path.join(MSXPIHOME, 'disks/tools.dsk')],
+ ['DriveM', 'https://github.com/costarc/MSXPi/raw/master/software/target'],
+ ['DriveR1', 'https://www.msxarchive.nl/pub/msx/games/roms/msx1'],
+ ['DriveR2', 'https://www.msxarchive.nl/pub/msx/games/roms/msx2'],
+ ['WIDTH', '80'],
+ ['WIFISSID', 'MYWIFI'],
+ ['WIFIPWD', 'MYWFIPASSWORD'],
+ ['WIFICOUNTRY', 'GB'],
+ ['DSKTMPL', os.path.join(MSXPIHOME, 'disks/blank.dsk')],
+ ['IRCNICK', 'msxpi'],
+ ['IRCADDR', 'chat.freenode.net'],
+ ['IRCPORT', '6667'],
+ ['OPENAIKEY', '']
+ ]
+
+print(f"\n** Starting MSXPi Server Version {version} Build {BuildId} (Socket Mode) **\n")
+
hostType = detect_host()
ShowSecurityDisclaimer()
-if hostType == "RaspberryPi":
- import RPi.GPIO as GPIO
-# GPIO Pins is now defined by the user
-SPI_CS = int(getMSXPiVar("SPI_CS"))
-SPI_SCLK = int(getMSXPiVar("SPI_SCLK"))
-SPI_MOSI = int(getMSXPiVar("SPI_MOSI"))
-SPI_MISO = int(getMSXPiVar("SPI_MISO"))
-RPI_READY = int(getMSXPiVar("RPI_READY"))
conn = initialize_connection()
-# Start MSXPi Server main loop - wait command and execute.
-# Set a interrupt for Control+C to exit the program gracefully and cleaning GPIO
-# status (on Raspberry PI)
try:
while True:
try:
@@ -1615,20 +1019,27 @@
if buf[0] == 0:
fullcmd=''
else:
- fullcmd = buf.decode().split("\x00")[0]
- cmd = fullcmd.split()[0].lower()
-
- parms = fullcmd[len(cmd)+1:]
- # Executes the command (first word in the string)
- # And passes the whole string (including command name) to the function
- # globals()['use_variable_as_function_name']()
- globals()[cmd.strip()]()
+ fullcmd = buf.decode(errors='ignore').split("\x00")[0]
+
+ if fullcmd:
+ cmd = fullcmd.split()[0].lower()
+ parms = fullcmd[len(cmd)+1:]
+ if cmd in globals():
+ globals()[cmd.strip()]()
+ else:
+ print(f"Unknown command: {cmd}")
+ sendmultiblock(f"Pi:Error - Unknown command '{cmd}'".encode(), BLKSIZE, RC_INVALIDCOMMAND)
+ else:
+ # Empty command received, just loop back
+ pass
except Exception as e:
errcount += 1
- print(f"MSXPi Server: {str(e)}")
- recvdata(BLKSIZE) # Read & discard parameters to avoid sync errors
- sendmultiblock(("Pi:Error - "+str(e)).encode(),BLKSIZE, RC_FAILED)
+ print(f"MSXPi Server Exception: {str(e)}")
+ # Attempt to gracefully ignore the error and continue
+ # recvdata(BLKSIZE) # This might hang if client is disconnected
+ # sendmultiblock(("Pi:Error - "+str(e)).encode(),BLKSIZE, RC_FAILED)
except KeyboardInterrupt:
- if detect_host() == "RaspberryPi":
- GPIO.cleanup() # cleanup all GPIO
- print(f"MSXPi Server: Terminating")
+ print(f"\nMSXPi Server: Terminating")
+finally:
+ if conn:
+ conn.close()
+