はじめに
- 最近Azure Data Lake Storage Gen2上でファイル作成などを実施する必要が出てきたのでその時の実装のメモ
- ファイル移動などの処理はSynapse AnalyticsにPySparkの処理をデプロイし実行する
概要
- 定期的にファイルを作成する処理を実装した
- すでにファイルが存在する場合は上書きしたい
- ファイルを作成するときは勝手にファイル名が割り当てられる(出力先ディレクトリしか指定できない)
- そのため一度作成したファイルを希望するファイル名にするため、ファイル移動する必要がある
- PySparkではファイル移動で既存ファイルの上書きはできないよう
- 一度削除してから移動する必要があった
- 削除も存在確認が必要
詳細
-
ドキュメントを見ると、
mssparkutils.fs.mv
について、移動先にファイルがある場合の挙動が記載されていない - 実際に試したところ、移動先に既に同名称のファイルがある場合はエラー
py4j.protocol.Py4JJavaError: An error occurred while calling z:mssparkutils.fs.mv.
: org.apache.hadoop.fs.PathExistsException: `abfss://***/somefile': File exists
at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.checkDest(MSFsUtilsImpl.scala:99)
at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.mvWithinFileSystem(MSFsUtilsImpl.scala:129)
at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.mv(MSFsUtilsImpl.scala:340)
at mssparkutils.fs$.mv(fs.scala:22)
at mssparkutils.fs.mv(fs.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
- mssparkutils.fs.rm については削除失敗した場合の挙動について明言されていない
- 試したところ、存在しないファイルを削除しようとすると以下のエラー
2024-02-16 11:31:58,519 INFO UserConsole [redirect output]: : An error occurred while calling z:mssparkutils.fs.rm.
: org.apache.hadoop.fs.PathNotFoundException: `abfss://***/somefile': No such file or directory
at com.microsoft.spark.notebook.msutils.impl.MSFsUtilsImpl.rm(MSFsUtilsImpl.scala:626)
at mssparkutils.fs$.rm(fs.scala:31)
at mssparkutils.fs.rm(fs.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForC
2024-02-16 11:31:58,521 INFO UserConsole [redirect output]: ommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
- そのためファイルがある場合にのみ削除する必要がある
- 例えば以下のようにする
# if target file path already existed, delete
if mssparkutils.fs.exists(target) :
mssparkutils.fs.rm(target)
print("delete " + target)
# move from object to target
mssparkutils.fs.mv(object, target, False)