2
1

More than 3 years have passed since last update.

Java AtomicLongを使ったシングルスレッド保証(kafka参考)

Posted at

KafkaConsumer の poll()辺りののソースコード見ていたら、AtomicLongを使ってシングルスレッド保証をしていた。
考え方が綺麗だったので、自分でも試したやつを乗っけておく。

(参考:KafkaConsumerのpoll()部分

block-release.java

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;


public class Main {


    private static final Long NO_CURRENT_THREAD = -1L;
    private static final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);

    private static final AtomicInteger refcount = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {

        refcount.incrementAndGet();

        currentThread.incrementAndGet();

        Thread.sleep(199);

        release();

        Thread.sleep(199);

        acquire();    
    }

     public static void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            System.out.println("this is not safe for multi-threaded access");

        refcount.incrementAndGet();

        System.out.println(threadId);
        System.out.println(currentThread);
    }

    private static void release() {

        if(refcount.decrementAndGet() == 0)
            currentThread.set(NO_CURRENT_THREAD);    
    }

}

KafkaConsumerのソースコード

KafkaConsumer.java


    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try {
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }

            // poll for new data until the timeout expires
            do {
                client.maybeTriggerWakeup();

                if (includeMetadataInTimeout) {
                    if (!updateAssignmentMetadataIfNeeded(timer)) {
                        return ConsumerRecords.empty();
                    }
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                        log.warn("Still waiting for metadata");
                    }
                }

                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.pollNoWakeup();
                    }

                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }
            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1