ref: a1bbf268d91af2fe34cc8e247f0e617ba102c578
dir: /dispatch.go/
package main
import (
"context"
"fmt"
"sync"
"time"
mastodon "codeberg.org/penny64/hellclient-go-mastodon"
)
const ()
type Job interface {
Do()
Init()
Wait()
}
type StatusJob struct {
context context.Context
status *mastodon.Status
ID *mastodon.ID
jobfunc func(context.Context, *mastodon.ID)
err error
}
type AnonJob struct {
jobfunc func()
wg sync.WaitGroup
}
func (job *AnonJob) Do() {
job.jobfunc()
job.wg.Done()
}
func (job *AnonJob) Init() {
job.wg.Add(1)
}
func (job *AnonJob) Wait() {
job.wg.Wait()
}
type GenericJob struct {
jobfunc func(job *GenericJob)
result string
err error
wg sync.WaitGroup
}
func (job *GenericJob) Wait() {
job.wg.Wait()
}
func (job *GenericJob) Init() {
job.wg.Add(1)
}
func (job *GenericJob) Do() {
job.jobfunc(job)
job.wg.Done()
}
func (hc *Hellclient) clientDispatch() {
var tootQueue []*mastodon.Toot
var jobQueue []Job
//Last time we sent an API call
var lastfire time.Time
//Last time the user sent us a line
var lastreceive time.Time
var comingfast bool
receiveStatus := func(statustoot *mastodon.Toot) {
//Got multiple lines within a second
if time.Since(lastreceive) < time.Second {
comingfast = true
}
lastreceive = time.Now()
tootQueue = append(tootQueue, statustoot)
}
receiveJob := func(job Job) {
jobQueue = append(jobQueue, job)
}
for {
select {
case job := <-hc.jobdispatch:
receiveJob(job)
case statustoot := <-hc.dispatch:
receiveStatus(statustoot)
//API delay needs to be tracked without being reset by new inputs
case <-time.After(hc.preferences.ApiDelayTime() - time.Since(lastfire)):
if 1 > len(tootQueue) && 1 > len(jobQueue) {
//Queues are empty, block the loop until we get a new input
select {
case statustoot := <-hc.dispatch:
receiveStatus(statustoot)
case job := <-hc.jobdispatch:
receiveJob(job)
}
break
}
//User is sending lines faster than one a second, flood control
if comingfast {
//Take over the loop until we stop getting messages fast
for comingfast {
select {
case statustoot := <-hc.dispatch:
receiveStatus(statustoot)
//We already know we're flooding so we can probably be a little less sensitive
case <-time.After(time.Second / 5):
fmt.Printf("Got %v lines fast!\n", len(tootQueue))
tootQueue = nil
comingfast = false
}
}
//Nothing to send
break
}
hc.stats.slock.Lock()
hc.stats.APICalls++
hc.stats.slock.Unlock()
if len(tootQueue) > 0 {
toot := tootQueue[0]
lastfire = time.Now()
var err error
status, err := postStatusDetailed(*hc.client, *toot)
if err != nil {
fmt.Println(err)
}
hc.lock()
hc.recentpost = status
hc.unlock()
tootQueue = tootQueue[1:]
break
}
//We wouldn't be here if there wasn't something in some queue
job := jobQueue[0]
jobQueue = jobQueue[1:]
job.Do()
lastfire = time.Now()
}
}
}
func (hc *Hellclient) dispatchStatus(poststring string, visibility string) {
status := postStatus(poststring, visibility)
for _, item := range hc.attacher.consumeAttachments() {
status.MediaIDs = append(status.MediaIDs, item.ID)
}
hc.prompt.UpdatePrompt()
hc.dispatch <- status
}
func (hc *Hellclient) dispatchReply(posttext string, replyto mastodon.ID, postItem *mastodon.Status) {
hc.dispatch <- postReply(posttext, replyto, hc.currentuser.ID, postItem)
}
func (hc *Hellclient) dispatchJob(job Job) {
job.Init()
hc.jobdispatch <- job
}
func (hc *Hellclient) dispatchFunc(enclosure func(*GenericJob)) Job {
noticeJob := &GenericJob{
jobfunc: enclosure,
}
noticeJob.Init()
hc.jobdispatch <- noticeJob
return noticeJob
}
func (hc *Hellclient) dispatchAnon(enclosure func()) Job {
noticeJob := &AnonJob{
jobfunc: enclosure,
}
noticeJob.Init()
hc.jobdispatch <- noticeJob
return noticeJob
}