LoginSignup
0
0

More than 5 years have passed since last update.

golang balance threads

Posted at

image.png

type job struct {
    conn     net.Conn
    opcode   int
    data     []byte
    result   chan ResultType //結果を他のchannelに渡すため
}
type jobPair struct {
    key   string
    value *job
}


type worker struct {
    jobqueue  map[string]*job // key:UserName
    broadcast chan DataType
    jobadd    chan *jobPair
    jobdel    chan string
    pending   safepending
    index     int
    done      chan struct{}
}

func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
    return &worker{
        jobqueue:  make(map[string]*job, queue_limit),
        broadcast: make(chan DataType, source_limit), 
        jobadd:    make(chan jobPair, jobreq_limit),
        jobdel:    make(chan string, jobreq_limit),
        pending:   safepending{0, sync.RWMutex{}},
        index:     idx,
        done:      make(chan struct{}),
    }
}

func (w *worker) PushJob(user string, job *job) {
    pair := jobPair{
        key:   user,
        value: job,
    }
    w.jobadd <- pair
}

func (w *worker) RemoveJob(user string) {
    w.jobdel <- user
}

func (w *worker) Run(wg *sync.WaitGroup) {
    wg.Add(1)
    go func() {
        log.Println("new goroutine, worker index:", w.index)
        defer wg.Done()
        ticker := time.NewTicker(time.Second * 60)
        for {
            select {
            case data := <-w.broadcast:
                for _, job := range w.jobqueue {
                    log.Println(job, data)
                }
            case jobpair := <-w.jobadd:
                w.insertJob(jobpair.key, jobpair.value)
            case delkey := <-w.jobdel:
                w.deleteJob(delkey)
            case <-ticker.C:
                w.loadInfo()
            case <-w.done:
                log.Println("worker", w.index, "exit")
                break
            }
        }
    }()
}

func (w *worker) Stop() {
    go func() {
        w.done <- struct{}{}
    }()
}
func (w *worker) insertJob(key string, value *job) error {
    w.jobqueue[key] = value
    w.pending.Inc()
    return nil
}

func (w *worker) deleteJob(key string) {
    delete(w.jobqueue, key)
    w.pending.Dec()
}

作者:https://www.jianshu.com/p/215510810c59

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0