shithub: hell

ref: ac46bfd916704894c6f4e961eaaf2775b02d18d9
dir: /dispatch.go/

View raw version
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	mastodon "codeberg.org/penny64/hellclient-go-mastodon"
)

const ()

type Job interface {
	Do()
	Init()
	Wait()
}

type StatusJob struct {
	context context.Context
	status  *mastodon.Status
	ID      *mastodon.ID
	jobfunc func(context.Context, *mastodon.ID)
	err     error
}

type AnonJob struct {
	jobfunc func()
	wg      sync.WaitGroup
}

func (job *AnonJob) Do() {
	job.jobfunc()
	job.wg.Done()
}

func (job *AnonJob) Init() {
	job.wg.Add(1)
}

func (job *AnonJob) Wait() {
	job.wg.Wait()
}

type GenericJob struct {
	jobfunc func(job *GenericJob)
	result  string
	err     error
	wg      sync.WaitGroup
}

func (job *GenericJob) Wait() {
	job.wg.Wait()
}

func (job *GenericJob) Init() {
	job.wg.Add(1)
}

func (job *GenericJob) Do() {
	job.jobfunc(job)
	job.wg.Done()
}

func (hc *Hellclient) clientDispatch() {
	var tootQueue []*mastodon.Toot
	var jobQueue []Job
	//Last time we sent an API call
	var lastfire time.Time
	//Last time the user sent us a line
	var lastreceive time.Time
	var comingfast bool

	receiveStatus := func(statustoot *mastodon.Toot) {
		//Got multiple lines within a second
		if time.Since(lastreceive) < time.Second {
			comingfast = true
		}
		lastreceive = time.Now()
		tootQueue = append(tootQueue, statustoot)
	}

	receiveJob := func(job Job) {
		jobQueue = append(jobQueue, job)
	}

	for {
		select {
		case job := <-hc.jobdispatch:
			receiveJob(job)
		case statustoot := <-hc.dispatch:
			receiveStatus(statustoot)
		//API delay needs to be tracked without being reset by new inputs
		case <-time.After(hc.preferences.ApiDelayTime() - time.Since(lastfire)):
			if 1 > len(tootQueue) && 1 > len(jobQueue) {
				//Queues are empty, block the loop until we get a new input
				select {
				case statustoot := <-hc.dispatch:
					receiveStatus(statustoot)
				case job := <-hc.jobdispatch:
					receiveJob(job)
				}
				break
			}
			//User is sending lines faster than one a second, flood control
			if comingfast {
				//Take over the loop until we stop getting messages fast
				for comingfast {
					select {
					case statustoot := <-hc.dispatch:
						receiveStatus(statustoot)
					//We already know we're flooding so we can probably be a little less sensitive
					case <-time.After(time.Second / 5):
						fmt.Printf("Got %v lines fast!\n", len(tootQueue))
						tootQueue = nil
						comingfast = false
					}
				}
				//Nothing to send
				break
			}

			hc.stats.slock.Lock()
			hc.stats.APICalls++
			hc.stats.slock.Unlock()
			if len(tootQueue) > 0 {
				toot := tootQueue[0]
				lastfire = time.Now()
				var err error
				status, err := postStatusDetailed(*hc.client, *toot)
				if err != nil {
					fmt.Println(err)
				}
				hc.lock()
				hc.recentpost = status
				hc.unlock()
				tootQueue = tootQueue[1:]
				break
			}
			//We wouldn't be here if there wasn't something in some queue
			job := jobQueue[0]
			jobQueue = jobQueue[1:]
			job.Do()
			lastfire = time.Now()
		}
	}
}

func (hc *Hellclient) dispatchStatus(poststring string, visibility string) {
	status := postStatus(poststring, visibility)
	for _, item := range hc.attacher.consumeAttachments() {
		status.MediaIDs = append(status.MediaIDs, item.ID)
	}
	hc.prompt.UpdatePrompt()
	hc.dispatch <- status
}

func (hc *Hellclient) dispatchReply(posttext string, subject string, replyto mastodon.ID, postItem *mastodon.Status) {
	reply := postReply(posttext, subject, replyto, hc.currentuser.ID, postItem)
	for _, item := range hc.attacher.consumeAttachments() {
		reply.MediaIDs = append(reply.MediaIDs, item.ID)
	}
	hc.prompt.UpdatePrompt()
	hc.dispatch <- reply
}

func (hc *Hellclient) dispatchJob(job Job) {
	job.Init()
	hc.jobdispatch <- job
}

func (hc *Hellclient) dispatchFunc(enclosure func(*GenericJob)) Job {
	noticeJob := &GenericJob{
		jobfunc: enclosure,
	}
	noticeJob.Init()
	hc.jobdispatch <- noticeJob
	return noticeJob
}

func (hc *Hellclient) dispatchAnon(enclosure func()) Job {
	noticeJob := &AnonJob{
		jobfunc: enclosure,
	}
	noticeJob.Init()
	hc.jobdispatch <- noticeJob
	return noticeJob
}