0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Rust】【Java】非同期のユースケースとコード

Last updated at Posted at 2025-04-12

非同期が必要なシーン:分類ごとに整理

カテゴリ ユースケース(代表) 非同期理由
🌐 ネットワーク HTTPサーバ、WebSocket、gRPC 接続数が多い・IOが遅い
🧵 並行タスク 同時リクエスト処理、並列計算 複数の仕事を止めずにやりたい
🗄 DB/外部通信 PostgreSQL、API通信 IO待ちが重い
📁 ファイルIO 非同期ログ、CSV書き出し 同期I/Oはブロッキング
⏰ 時間操作 タイマー、バックグラウンド処理 sleepを止めずに実行
🧭 ストリーミング イベント監視、リアルタイム受信 永続接続+イベント駆動
📶 クライアント側HTTP CLI・ツールから非同期取得 バックグラウンドで取得できる
📡 メッセージング MQTT/Kafka/AMQPなどの購読 常時接続+再接続対応
👥 並行処理(非IO) タスク並列分散、Webクローラ spawn + joinで仕事分散
🛎️ 通知/ジョブスケジューラ リマインダー、定時処理 async sleep/trigger
💬 チャット/ゲーム通信 双方向イベント処理 イベント+IO駆動型
🧪 テスト実行ツール 並列テスト/非同期セットアップ タスクを並行に流す
🧰 CLIツール curl的動き、複数ファイル取得 複数の待ちを止めずに処理
🛰 IoT通信 センサーデータの逐次受信 接続・ストリーム型データ
📤 大量バッチ送信 メール・通知・Slack投稿など レート制限回避しつつ非同期送信

コード

rust
// ✅ 非同期が必要な15の代表ユースケース + 最小コード例

// 1. Web API ハンドラー(axum)
use axum::{routing::get, Router};
async fn api_handler() -> &'static str {
    "Hello, async API"
}
fn app() -> Router {
    Router::new().route("/", get(api_handler))
}

// 2. 複数リクエストを同時処理(tokio::spawn)
#[tokio::main]
async fn main_spawn() {
    for i in 0..3 {
        tokio::spawn(async move {
            println!("Task {} start", i);
        });
    }
}

// 3. 外部API呼び出し(reqwest)
#[tokio::main]
async fn fetch_google() {
    let body = reqwest::get("https://www.google.com")
        .await
        .unwrap()
        .text()
        .await
        .unwrap();
    println!("Body: {}", &body[..50]);
}

// 4. PostgreSQL接続(sqlx)
#[tokio::main]
async fn db_query() -> Result<(), sqlx::Error> {
    let pool = sqlx::PgPool::connect("postgres://user:pass@localhost/db").await?;
    let row: (i32,) = sqlx::query_as("SELECT 1")
        .fetch_one(&pool)
        .await?;
    println!("Result: {}", row.0);
    Ok(())
}

// 5. ファイル読み書き(tokio::fs)
#[tokio::main]
async fn file_io() {
    use tokio::fs;
    fs::write("file.txt", "hello").await.unwrap();
    let content = fs::read_to_string("file.txt").await.unwrap();
    println!("{}", content);
}

// 6. タイマーで遅延処理(tokio::time::sleep)
#[tokio::main]
async fn delayed_task() {
    use tokio::time::{sleep, Duration};
    println!("Start");
    sleep(Duration::from_secs(2)).await;
    println!("After 2 seconds");
}

// 7. WebSocketチャット(tokio-tungstenite)
// 簡略例(実装は要ライブラリ)
// 通常は tokio_tungstenite::accept_async + stream 処理

// 8. fetcher CLI(非同期リクエスト)
#[tokio::main]
async fn cli_fetch() {
    let args: Vec<String> = std::env::args().collect();
    let url = &args[1];
    let text = reqwest::get(url).await.unwrap().text().await.unwrap();
    println!("{}", &text[..100]);
}

// 9. Kafka/MQTT購読(非同期ストリーム)
// tokio_stream or rumqttc 等と併用(要依存追加)

// 10. バッチ送信処理
async fn send_batch() {
    let handles: Vec<_> = (0..5)
        .map(|i| tokio::spawn(async move {
            println!("Sending email #{}", i);
        }))
        .collect();
    for h in handles { h.await.unwrap(); }
}

// 11. Webクローラで複数ページ同時取得
async fn crawl_sites() {
    let urls = vec!["https://a.com", "https://b.com"];
    let futures = urls.iter().map(|url| reqwest::get(*url));
    let results = futures::future::join_all(futures).await;
    println!("{} sites fetched", results.len());
}

// 12. 並列テストランナー風(spawn + join)
async fn run_tests() {
    let tests = vec!["test_a", "test_b"];
    let handles = tests.into_iter().map(|t| tokio::spawn(async move {
        println!("Running {}", t);
    }));
    for h in handles { h.await.unwrap(); }
}

// 13. IoT センサー監視(常時受信)
// UDP or MQTTで受信、イベントループ維持

// 14. ログのバックグラウンド送信
async fn log_sender() {
    let tx = tokio::spawn(async move {
        loop {
            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
            println!("Sending logs...");
        }
    });
    tx.await.unwrap();
}

// 15. チャットなど双方向通信
// WebSocketベース、stream + sink で読み書き(省略)

// 実用的な非同期ユースケースを体験しながら学べる構成です。
// 必要に応じて個別のユースケースを拡張・実装可能。

java
// Java の場合は ExecutorService や CompletableFuture を使って非同期処理を実現します。

// 1. Web API Handler (Spring Boot)
@RestController
public class ApiController {
    @GetMapping("/hello")
    public CompletableFuture<String> hello() {
        return CompletableFuture.supplyAsync(() -> "Hello, async API");
    }
}

// 2. 複数リクエストを同時処理
public static void main(String[] args) {
    ExecutorService exec = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 3; i++) {
        int id = i;
        exec.submit(() -> System.out.println("Task " + id + " start"));
    }
    exec.shutdown();
}

// 3. 外部API呼び出し(HttpClient)
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create("https://www.google.com"))
    .build();
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
    .thenAccept(resp -> System.out.println(resp.body().substring(0, 50)));

// 4. DB接続(Spring + Async)
@Async
public CompletableFuture<User> findUserAsync(Long id) {
    return CompletableFuture.completedFuture(userRepository.findById(id).get());
}

// 5. ファイルIO(非同期ではNIO)
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, 0, buffer, new CompletionHandler<>() {
    public void completed(Integer result, ByteBuffer attachment) {
        attachment.flip();
        System.out.println(StandardCharsets.UTF_8.decode(attachment).toString());
    }
    public void failed(Throwable exc, ByteBuffer attachment) {
        exc.printStackTrace();
    }
});

// 6. タイマーで遅延処理(ScheduledExecutorService)
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> System.out.println("After 2 seconds"), 2, TimeUnit.SECONDS);

// 7. WebSocketチャット(Java WebSocket API)
@ClientEndpoint
public class ChatClient {
    @OnMessage
    public void onMessage(String message) {
        System.out.println("Received: " + message);
    }
}

// 8. fetcher CLI(非同期リクエスト)
public static void fetchUrl(String url) {
    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
    client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
          .thenAccept(resp -> System.out.println(resp.body()));
}

// 9. Kafka/MQTT購読(Kafka Consumer)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("topic"));
new Thread(() -> {
    while (true) {
        for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
            System.out.println(record.value());
        }
    }
}).start();

// 10. バッチ送信処理
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
    final int id = i;
    pool.submit(() -> System.out.println("Sending email #" + id));
}

// 11. Webクローラで複数ページ同時取得
List<String> urls = List.of("https://a.com", "https://b.com");
urls.forEach(url -> {
    HttpRequest req = HttpRequest.newBuilder().uri(URI.create(url)).build();
    HttpClient.newHttpClient()
        .sendAsync(req, HttpResponse.BodyHandlers.ofString())
        .thenAccept(r -> System.out.println("Fetched: " + url));
});

// 12. 並列テストランナー風(Executor)
ExecutorService testPool = Executors.newFixedThreadPool(2);
List<String> tests = List.of("testA", "testB");
tests.forEach(t -> testPool.submit(() -> System.out.println("Running " + t)));

// 13. IoT センサー監視(UDP非同期)
new Thread(() -> {
    try (DatagramSocket socket = new DatagramSocket(9999)) {
        byte[] buf = new byte[1024];
        DatagramPacket packet = new DatagramPacket(buf, buf.length);
        while (true) {
            socket.receive(packet);
            System.out.println("Sensor data: " + new String(packet.getData()));
        }
    } catch (IOException e) { e.printStackTrace(); }
}).start();

// 14. ログのバックグラウンド送信
ScheduledExecutorService logger = Executors.newSingleThreadScheduledExecutor();
logger.scheduleAtFixedRate(() -> System.out.println("Flushing logs..."), 0, 10, TimeUnit.SECONDS);

// 15. チャットなど双方向通信(WebSocket)
@ServerEndpoint("/chat")
public class ChatEndpoint {
    @OnMessage
    public void onMessage(Session session, String msg) throws IOException {
        session.getBasicRemote().sendText("Echo: " + msg);
    }
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?