非同期が必要なシーン:分類ごとに整理
カテゴリ | ユースケース(代表) | 非同期理由 |
---|---|---|
🌐 ネットワーク | HTTPサーバ、WebSocket、gRPC | 接続数が多い・IOが遅い |
🧵 並行タスク | 同時リクエスト処理、並列計算 | 複数の仕事を止めずにやりたい |
🗄 DB/外部通信 | PostgreSQL、API通信 | IO待ちが重い |
📁 ファイルIO | 非同期ログ、CSV書き出し | 同期I/Oはブロッキング |
⏰ 時間操作 | タイマー、バックグラウンド処理 |
sleep を止めずに実行 |
🧭 ストリーミング | イベント監視、リアルタイム受信 | 永続接続+イベント駆動 |
📶 クライアント側HTTP | CLI・ツールから非同期取得 | バックグラウンドで取得できる |
📡 メッセージング | MQTT/Kafka/AMQPなどの購読 | 常時接続+再接続対応 |
👥 並行処理(非IO) | タスク並列分散、Webクローラ | spawn + joinで仕事分散 |
🛎️ 通知/ジョブスケジューラ | リマインダー、定時処理 | async sleep/trigger |
💬 チャット/ゲーム通信 | 双方向イベント処理 | イベント+IO駆動型 |
🧪 テスト実行ツール | 並列テスト/非同期セットアップ | タスクを並行に流す |
🧰 CLIツール | curl的動き、複数ファイル取得 | 複数の待ちを止めずに処理 |
🛰 IoT通信 | センサーデータの逐次受信 | 接続・ストリーム型データ |
📤 大量バッチ送信 | メール・通知・Slack投稿など | レート制限回避しつつ非同期送信 |
コード
rust
// ✅ 非同期が必要な15の代表ユースケース + 最小コード例
// 1. Web API ハンドラー(axum)
use axum::{routing::get, Router};
async fn api_handler() -> &'static str {
"Hello, async API"
}
fn app() -> Router {
Router::new().route("/", get(api_handler))
}
// 2. 複数リクエストを同時処理(tokio::spawn)
#[tokio::main]
async fn main_spawn() {
for i in 0..3 {
tokio::spawn(async move {
println!("Task {} start", i);
});
}
}
// 3. 外部API呼び出し(reqwest)
#[tokio::main]
async fn fetch_google() {
let body = reqwest::get("https://www.google.com")
.await
.unwrap()
.text()
.await
.unwrap();
println!("Body: {}", &body[..50]);
}
// 4. PostgreSQL接続(sqlx)
#[tokio::main]
async fn db_query() -> Result<(), sqlx::Error> {
let pool = sqlx::PgPool::connect("postgres://user:pass@localhost/db").await?;
let row: (i32,) = sqlx::query_as("SELECT 1")
.fetch_one(&pool)
.await?;
println!("Result: {}", row.0);
Ok(())
}
// 5. ファイル読み書き(tokio::fs)
#[tokio::main]
async fn file_io() {
use tokio::fs;
fs::write("file.txt", "hello").await.unwrap();
let content = fs::read_to_string("file.txt").await.unwrap();
println!("{}", content);
}
// 6. タイマーで遅延処理(tokio::time::sleep)
#[tokio::main]
async fn delayed_task() {
use tokio::time::{sleep, Duration};
println!("Start");
sleep(Duration::from_secs(2)).await;
println!("After 2 seconds");
}
// 7. WebSocketチャット(tokio-tungstenite)
// 簡略例(実装は要ライブラリ)
// 通常は tokio_tungstenite::accept_async + stream 処理
// 8. fetcher CLI(非同期リクエスト)
#[tokio::main]
async fn cli_fetch() {
let args: Vec<String> = std::env::args().collect();
let url = &args[1];
let text = reqwest::get(url).await.unwrap().text().await.unwrap();
println!("{}", &text[..100]);
}
// 9. Kafka/MQTT購読(非同期ストリーム)
// tokio_stream or rumqttc 等と併用(要依存追加)
// 10. バッチ送信処理
async fn send_batch() {
let handles: Vec<_> = (0..5)
.map(|i| tokio::spawn(async move {
println!("Sending email #{}", i);
}))
.collect();
for h in handles { h.await.unwrap(); }
}
// 11. Webクローラで複数ページ同時取得
async fn crawl_sites() {
let urls = vec!["https://a.com", "https://b.com"];
let futures = urls.iter().map(|url| reqwest::get(*url));
let results = futures::future::join_all(futures).await;
println!("{} sites fetched", results.len());
}
// 12. 並列テストランナー風(spawn + join)
async fn run_tests() {
let tests = vec!["test_a", "test_b"];
let handles = tests.into_iter().map(|t| tokio::spawn(async move {
println!("Running {}", t);
}));
for h in handles { h.await.unwrap(); }
}
// 13. IoT センサー監視(常時受信)
// UDP or MQTTで受信、イベントループ維持
// 14. ログのバックグラウンド送信
async fn log_sender() {
let tx = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
println!("Sending logs...");
}
});
tx.await.unwrap();
}
// 15. チャットなど双方向通信
// WebSocketベース、stream + sink で読み書き(省略)
// 実用的な非同期ユースケースを体験しながら学べる構成です。
// 必要に応じて個別のユースケースを拡張・実装可能。
java
// Java の場合は ExecutorService や CompletableFuture を使って非同期処理を実現します。
// 1. Web API Handler (Spring Boot)
@RestController
public class ApiController {
@GetMapping("/hello")
public CompletableFuture<String> hello() {
return CompletableFuture.supplyAsync(() -> "Hello, async API");
}
}
// 2. 複数リクエストを同時処理
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
int id = i;
exec.submit(() -> System.out.println("Task " + id + " start"));
}
exec.shutdown();
}
// 3. 外部API呼び出し(HttpClient)
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://www.google.com"))
.build();
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenAccept(resp -> System.out.println(resp.body().substring(0, 50)));
// 4. DB接続(Spring + Async)
@Async
public CompletableFuture<User> findUserAsync(Long id) {
return CompletableFuture.completedFuture(userRepository.findById(id).get());
}
// 5. ファイルIO(非同期ではNIO)
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("file.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, 0, buffer, new CompletionHandler<>() {
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println(StandardCharsets.UTF_8.decode(attachment).toString());
}
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
// 6. タイマーで遅延処理(ScheduledExecutorService)
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> System.out.println("After 2 seconds"), 2, TimeUnit.SECONDS);
// 7. WebSocketチャット(Java WebSocket API)
@ClientEndpoint
public class ChatClient {
@OnMessage
public void onMessage(String message) {
System.out.println("Received: " + message);
}
}
// 8. fetcher CLI(非同期リクエスト)
public static void fetchUrl(String url) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenAccept(resp -> System.out.println(resp.body()));
}
// 9. Kafka/MQTT購読(Kafka Consumer)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("topic"));
new Thread(() -> {
while (true) {
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(100))) {
System.out.println(record.value());
}
}
}).start();
// 10. バッチ送信処理
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int id = i;
pool.submit(() -> System.out.println("Sending email #" + id));
}
// 11. Webクローラで複数ページ同時取得
List<String> urls = List.of("https://a.com", "https://b.com");
urls.forEach(url -> {
HttpRequest req = HttpRequest.newBuilder().uri(URI.create(url)).build();
HttpClient.newHttpClient()
.sendAsync(req, HttpResponse.BodyHandlers.ofString())
.thenAccept(r -> System.out.println("Fetched: " + url));
});
// 12. 並列テストランナー風(Executor)
ExecutorService testPool = Executors.newFixedThreadPool(2);
List<String> tests = List.of("testA", "testB");
tests.forEach(t -> testPool.submit(() -> System.out.println("Running " + t)));
// 13. IoT センサー監視(UDP非同期)
new Thread(() -> {
try (DatagramSocket socket = new DatagramSocket(9999)) {
byte[] buf = new byte[1024];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
while (true) {
socket.receive(packet);
System.out.println("Sensor data: " + new String(packet.getData()));
}
} catch (IOException e) { e.printStackTrace(); }
}).start();
// 14. ログのバックグラウンド送信
ScheduledExecutorService logger = Executors.newSingleThreadScheduledExecutor();
logger.scheduleAtFixedRate(() -> System.out.println("Flushing logs..."), 0, 10, TimeUnit.SECONDS);
// 15. チャットなど双方向通信(WebSocket)
@ServerEndpoint("/chat")
public class ChatEndpoint {
@OnMessage
public void onMessage(Session session, String msg) throws IOException {
session.getBasicRemote().sendText("Echo: " + msg);
}
}