mirror of
https://github.com/superseriousbusiness/gotosocial.git
synced 2025-01-12 01:08:43 +01:00
acc95923da
* replace media workers with just runners.WorkerPool, move to state structure, use go-sched for global task scheduling * improved code comment * fix worker tryUntil function, update go-runners/go-sched * make preprocess functions package public, use these where possible to stop doubled up processing * remove separate emoji worker pool * limit calls to time.Now() during media preprocessing * use Processor{} to manage singular runtime of processing media * ensure workers get started when media manager is used * improved error setting in processing media, fix media test * port changes from processingmedia to processing emoji * finish code commenting * finish code commenting and comment-out client API + federator worker pools until concurrency worker pools replaced * linterrrrrrrrrrrrrrrr --------- Signed-off-by: kim <grufwub@gmail.com>
218 lines
4.6 KiB
Go
218 lines
4.6 KiB
Go
package runners
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
// Service provides a means of tracking a single long-running service, provided protected state
|
|
// changes and preventing multiple instances running. Also providing service state information.
|
|
type Service struct {
|
|
state uint32 // 0=stopped, 1=running, 2=stopping
|
|
mutex sync.Mutex // mutex protects overall state changes
|
|
wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
|
|
ctx chan struct{} // ctx is the current context for running function (or nil if not running)
|
|
}
|
|
|
|
// Run will run the supplied function until completion, using given context to propagate cancel.
|
|
// Immediately returns false if the Service is already running, and true after completed run.
|
|
func (svc *Service) Run(fn func(context.Context)) bool {
|
|
// Attempt to start the svc
|
|
ctx, ok := svc.doStart()
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
defer func() {
|
|
// unlock single wait
|
|
svc.wait.Unlock()
|
|
|
|
// ensure stopped
|
|
_ = svc.Stop()
|
|
}()
|
|
|
|
// Run with context.
|
|
fn(CancelCtx(ctx))
|
|
|
|
return true
|
|
}
|
|
|
|
// GoRun will run the supplied function until completion in a goroutine, using given context to
|
|
// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
|
|
func (svc *Service) GoRun(fn func(context.Context)) bool {
|
|
// Attempt to start the svc
|
|
ctx, ok := svc.doStart()
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
go func() {
|
|
defer func() {
|
|
// unlock single wait
|
|
svc.wait.Unlock()
|
|
|
|
// ensure stopped
|
|
_ = svc.Stop()
|
|
}()
|
|
|
|
// Run with context.
|
|
fn(CancelCtx(ctx))
|
|
}()
|
|
|
|
return true
|
|
}
|
|
|
|
// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
|
|
func (svc *Service) RunWait(fn func(context.Context)) bool {
|
|
// Attempt to start the svc
|
|
ctx, ok := svc.doStart()
|
|
if !ok {
|
|
<-ctx // block
|
|
return false
|
|
}
|
|
|
|
defer func() {
|
|
// unlock single wait
|
|
svc.wait.Unlock()
|
|
|
|
// ensure stopped
|
|
_ = svc.Stop()
|
|
}()
|
|
|
|
// Run with context.
|
|
fn(CancelCtx(ctx))
|
|
|
|
return true
|
|
}
|
|
|
|
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
|
|
// returns false if not running, and true only after Service is fully stopped.
|
|
func (svc *Service) Stop() bool {
|
|
// Attempt to stop the svc
|
|
ctx, ok := svc.doStop()
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
defer func() {
|
|
// Get svc lock
|
|
svc.mutex.Lock()
|
|
|
|
// Wait until stopped
|
|
svc.wait.Lock()
|
|
svc.wait.Unlock()
|
|
|
|
// Reset the svc
|
|
svc.ctx = nil
|
|
svc.state = 0
|
|
svc.mutex.Unlock()
|
|
}()
|
|
|
|
// Cancel ctx
|
|
close(ctx)
|
|
|
|
return true
|
|
}
|
|
|
|
// While allows you to execute given function guaranteed within current
|
|
// service state. Please note that this will hold the underlying service
|
|
// state change mutex open while executing the function.
|
|
func (svc *Service) While(fn func()) {
|
|
// Protect state change
|
|
svc.mutex.Lock()
|
|
defer svc.mutex.Unlock()
|
|
|
|
// Run
|
|
fn()
|
|
}
|
|
|
|
// doStart will safely set Service state to started, returning a ptr to this context insance.
|
|
func (svc *Service) doStart() (chan struct{}, bool) {
|
|
// Protect startup
|
|
svc.mutex.Lock()
|
|
|
|
if svc.ctx == nil {
|
|
// this will only have been allocated
|
|
// if svc.Done() was already called.
|
|
svc.ctx = make(chan struct{})
|
|
}
|
|
|
|
// Take our own ptr
|
|
ctx := svc.ctx
|
|
|
|
if svc.state != 0 {
|
|
// State was not stopped.
|
|
svc.mutex.Unlock()
|
|
return ctx, false
|
|
}
|
|
|
|
// Set started.
|
|
svc.state = 1
|
|
|
|
// Start waiter.
|
|
svc.wait.Lock()
|
|
|
|
// Unlock and return
|
|
svc.mutex.Unlock()
|
|
return ctx, true
|
|
}
|
|
|
|
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
|
|
func (svc *Service) doStop() (chan struct{}, bool) {
|
|
// Protect stop
|
|
svc.mutex.Lock()
|
|
|
|
if svc.state != 1 /* not started */ {
|
|
svc.mutex.Unlock()
|
|
return nil, false
|
|
}
|
|
|
|
// state stopping
|
|
svc.state = 2
|
|
|
|
// Take our own ptr
|
|
// and unlock state
|
|
ctx := svc.ctx
|
|
svc.mutex.Unlock()
|
|
|
|
return ctx, true
|
|
}
|
|
|
|
// Running returns if Service is running (i.e. state NOT stopped / stopping).
|
|
func (svc *Service) Running() bool {
|
|
svc.mutex.Lock()
|
|
state := svc.state
|
|
svc.mutex.Unlock()
|
|
return (state == 1)
|
|
}
|
|
|
|
// Done returns a channel that's closed when Service.Stop() is called. It is
|
|
// the same channel provided to the currently running service function.
|
|
func (svc *Service) Done() <-chan struct{} {
|
|
var done <-chan struct{}
|
|
|
|
svc.mutex.Lock()
|
|
switch svc.state {
|
|
// stopped
|
|
case 0:
|
|
if svc.ctx == nil {
|
|
// here we create a new context so that the
|
|
// returned 'done' channel here will still
|
|
// be valid for when Service is next started.
|
|
svc.ctx = make(chan struct{})
|
|
}
|
|
done = svc.ctx
|
|
|
|
// started
|
|
case 1:
|
|
done = svc.ctx
|
|
|
|
// stopping
|
|
case 2:
|
|
done = svc.ctx
|
|
}
|
|
svc.mutex.Unlock()
|
|
|
|
return done
|
|
}
|