type job struct {
conn net.Conn
opcode int
data []byte
result chan ResultType //結果を他のchannelに渡すため
}
type jobPair struct {
key string
value *job
}
type worker struct {
jobqueue map[string]*job // key:UserName
broadcast chan DataType
jobadd chan *jobPair
jobdel chan string
pending safepending
index int
done chan struct{}
}
func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
return &worker{
jobqueue: make(map[string]*job, queue_limit),
broadcast: make(chan DataType, source_limit),
jobadd: make(chan jobPair, jobreq_limit),
jobdel: make(chan string, jobreq_limit),
pending: safepending{0, sync.RWMutex{}},
index: idx,
done: make(chan struct{}),
}
}
func (w *worker) PushJob(user string, job *job) {
pair := jobPair{
key: user,
value: job,
}
w.jobadd <- pair
}
func (w *worker) RemoveJob(user string) {
w.jobdel <- user
}
func (w *worker) Run(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
log.Println("new goroutine, worker index:", w.index)
defer wg.Done()
ticker := time.NewTicker(time.Second * 60)
for {
select {
case data := <-w.broadcast:
for _, job := range w.jobqueue {
log.Println(job, data)
}
case jobpair := <-w.jobadd:
w.insertJob(jobpair.key, jobpair.value)
case delkey := <-w.jobdel:
w.deleteJob(delkey)
case <-ticker.C:
w.loadInfo()
case <-w.done:
log.Println("worker", w.index, "exit")
break
}
}
}()
}
func (w *worker) Stop() {
go func() {
w.done <- struct{}{}
}()
}
func (w *worker) insertJob(key string, value *job) error {
w.jobqueue[key] = value
w.pending.Inc()
return nil
}
func (w *worker) deleteJob(key string) {
delete(w.jobqueue, key)
w.pending.Dec()
}
More than 5 years have passed since last update.
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme