mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2024-12-13 10:30:45 +01:00
7cc40302a5
* add miekg/dns dependency * set/validate accountDomain * move finger to dereferencer * totally break GetRemoteAccount * start reworking finger func a bit * start reworking getRemoteAccount a bit * move mention parts to namestring * rework webfingerget * use util function to extract webfinger parts * use accountDomain * rework finger again, final form * just a real nasty commit, the worst * remove refresh from account * use new ASRepToAccount signature * fix incorrect debug call * fix for new getRemoteAccount * rework GetRemoteAccount * start updating tests to remove repetition * break a lot of tests Move shared test logic into the testrig, rather than having it scattered all over the place. This allows us to just mock the transport controller once, and have all tests use it (unless they need not to for some other reason). * fix up tests to use main mock httpclient * webfinger only if necessary * cheeky linting with the lads * update mentionName regex recognize instance accounts * don't finger instance accounts * test webfinger part extraction * increase default worker count to 4 per cpu * don't repeat regex parsing * final search for discovered accountDomain * be more permissive in namestring lookup * add more extraction tests * simplify GetParseMentionFunc * skip long search if local account * fix broken test * consolidate to all use same caching libraries Signed-off-by: kim <grufwub@gmail.com> * perform more caching in the database layer Signed-off-by: kim <grufwub@gmail.com> * remove ASNote cache Signed-off-by: kim <grufwub@gmail.com> * update cache library, improve db tracing hooks Signed-off-by: kim <grufwub@gmail.com> * return ErrNoEntries if no account status IDs found, small formatting changes Signed-off-by: kim <grufwub@gmail.com> * fix tests, thanks tobi! Signed-off-by: kim <grufwub@gmail.com> Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
241 lines
5.3 KiB
Go
241 lines
5.3 KiB
Go
package sched
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"time"
|
|
|
|
"codeberg.org/gruf/go-atomics"
|
|
"codeberg.org/gruf/go-runners"
|
|
)
|
|
|
|
var (
|
|
// neverticks is a timer channel that never ticks (it's starved).
|
|
neverticks = make(chan time.Time)
|
|
|
|
// alwaysticks is a timer channel that always ticks (it's closed).
|
|
alwaysticks = func() chan time.Time {
|
|
ch := make(chan time.Time)
|
|
close(ch)
|
|
return ch
|
|
}()
|
|
)
|
|
|
|
// Scheduler provides a means of running jobs at specific times and
|
|
// regular intervals, all while sharing a single underlying timer.
|
|
type Scheduler struct {
|
|
jobs []*Job // jobs is a list of tracked Jobs to be executed
|
|
jch chan interface{} // jch accepts either Jobs or job IDs to notify new/removed jobs
|
|
svc runners.Service // svc manages the main scheduler routine
|
|
jid atomics.Uint64 // jid is used to iteratively generate unique IDs for jobs
|
|
}
|
|
|
|
// New returns a new Scheduler instance with given job change queue size.
|
|
func NewScheduler(queue int) Scheduler {
|
|
if queue < 0 {
|
|
queue = 10
|
|
}
|
|
return Scheduler{jch: make(chan interface{}, queue)}
|
|
}
|
|
|
|
// Start will attempt to start the Scheduler. Immediately returns false if the Service is already running, and true after completed run.
|
|
func (sch *Scheduler) Start() bool {
|
|
return sch.svc.Run(sch.run)
|
|
}
|
|
|
|
// Stop will attempt to stop the Scheduler. Immediately returns false if not running, and true only after Scheduler is fully stopped.
|
|
func (sch *Scheduler) Stop() bool {
|
|
return sch.svc.Stop()
|
|
}
|
|
|
|
// Running will return whether Scheduler is running.
|
|
func (sch *Scheduler) Running() bool {
|
|
return sch.svc.Running()
|
|
}
|
|
|
|
// Schedule will add provided Job to the Scheduler, returning a cancel function.
|
|
func (sch *Scheduler) Schedule(job *Job) (cancel func()) {
|
|
if job == nil {
|
|
// Ensure there's a job!
|
|
panic("nil job")
|
|
}
|
|
|
|
// Get last known job ID
|
|
last := sch.jid.Load()
|
|
|
|
// Give this job an ID and check overflow
|
|
if job.id = sch.jid.Add(1); job.id < last {
|
|
panic("scheduler job id overflow")
|
|
}
|
|
|
|
// Pass job to scheduler
|
|
sch.jch <- job
|
|
|
|
// Return cancel function for job ID
|
|
return func() { sch.jch <- job.id }
|
|
}
|
|
|
|
// run is the main scheduler run routine, which runs for as long as ctx is valid.
|
|
func (sch *Scheduler) run(ctx context.Context) {
|
|
var (
|
|
// timerset represents whether timer was running
|
|
// for a particular run of the loop. false means
|
|
// that tch == neverticks || tch == alwaysticks
|
|
timerset bool
|
|
|
|
// timer tick channel (or a never-tick channel)
|
|
tch <-chan time.Time
|
|
|
|
// timer notifies this main routine to wake when
|
|
// the job queued needs to be checked for executions
|
|
timer *time.Timer
|
|
|
|
// stopdrain will stop and drain the timer
|
|
// if it has been running (i.e. timerset == true)
|
|
stopdrain = func() {
|
|
if timerset && !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
}
|
|
)
|
|
|
|
for {
|
|
select {
|
|
// Handle received job/id
|
|
case v := <-sch.jch:
|
|
sch.handle(v)
|
|
continue
|
|
|
|
// No more
|
|
default:
|
|
}
|
|
|
|
// Done
|
|
break
|
|
}
|
|
|
|
// Create a stopped timer
|
|
timer = time.NewTimer(1)
|
|
<-timer.C
|
|
|
|
for {
|
|
// Reset timer state
|
|
timerset = false
|
|
|
|
if len(sch.jobs) > 0 {
|
|
// Sort jobs by next occurring
|
|
sort.Sort(byNext(sch.jobs))
|
|
|
|
// Get execution time
|
|
now := time.Now()
|
|
|
|
// Get next job time
|
|
next := sch.jobs[0].Next()
|
|
|
|
if until := next.Sub(now); until <= 0 {
|
|
// This job is behind schedule,
|
|
// set timer to always tick
|
|
tch = alwaysticks
|
|
} else {
|
|
// Reset timer to period
|
|
timer.Reset(until)
|
|
tch = timer.C
|
|
timerset = true
|
|
}
|
|
} else {
|
|
// Unset timer
|
|
tch = neverticks
|
|
}
|
|
|
|
select {
|
|
// Scheduler stopped
|
|
case <-ctx.Done():
|
|
stopdrain()
|
|
return
|
|
|
|
// Timer ticked, run scheduled
|
|
case now := <-tch:
|
|
sch.schedule(now)
|
|
|
|
// Received update, handle job/id
|
|
case v := <-sch.jch:
|
|
sch.handle(v)
|
|
stopdrain()
|
|
}
|
|
}
|
|
}
|
|
|
|
// handle takes an interfaces received from Scheduler.jch and handles either:
|
|
// - Job --> new job to add.
|
|
// - uint64 --> job ID to remove.
|
|
func (sch *Scheduler) handle(v interface{}) {
|
|
switch v := v.(type) {
|
|
// New job added
|
|
case *Job:
|
|
// Get current time
|
|
now := time.Now()
|
|
|
|
// Update the next call time
|
|
next := v.timing.Next(now)
|
|
v.next.Store(next)
|
|
|
|
// Append this job to queued
|
|
sch.jobs = append(sch.jobs, v)
|
|
|
|
// Job removed
|
|
case uint64:
|
|
for i := 0; i < len(sch.jobs); i++ {
|
|
if sch.jobs[i].id == v {
|
|
// This is the job we're looking for! Drop this
|
|
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// schedule will iterate through the scheduler jobs and execute those necessary, updating their next call time.
|
|
func (sch *Scheduler) schedule(now time.Time) {
|
|
for i := 0; i < len(sch.jobs); {
|
|
// Scope our own var
|
|
job := sch.jobs[i]
|
|
|
|
// We know these jobs are ordered by .Next(), so as soon
|
|
// as we reach one with .Next() after now, we can return
|
|
if job.Next().After(now) {
|
|
return
|
|
}
|
|
|
|
// Update the next call time
|
|
next := job.timing.Next(now)
|
|
job.next.Store(next)
|
|
|
|
// Run this job async!
|
|
go job.Run(now)
|
|
|
|
if job.Next().IsZero() {
|
|
// Zero time, this job is done and can be dropped
|
|
sch.jobs = append(sch.jobs[:i], sch.jobs[i+1:]...)
|
|
continue
|
|
}
|
|
|
|
// Iter
|
|
i++
|
|
}
|
|
}
|
|
|
|
// byNext is an implementation of sort.Interface to sort Jobs by their .Next() time.
|
|
type byNext []*Job
|
|
|
|
func (by byNext) Len() int {
|
|
return len(by)
|
|
}
|
|
|
|
func (by byNext) Less(i int, j int) bool {
|
|
return by[i].Next().Before(by[j].Next())
|
|
}
|
|
|
|
func (by byNext) Swap(i int, j int) {
|
|
by[i], by[j] = by[j], by[i]
|
|
}
|