Move RequestProcessor to a struct

This commit is contained in:
Igor Chubin 2022-11-27 15:51:41 +01:00
parent b6687ee037
commit 762e0fe8f0
3 changed files with 79 additions and 58 deletions

View File

@ -9,28 +9,31 @@ import (
"github.com/robfig/cron"
)
var peakRequest30 sync.Map
var peakRequest60 sync.Map
func initPeakHandling() {
func (rp *RequestProcessor) startPeakHandling() {
c := cron.New()
// cronTime := fmt.Sprintf("%d,%d * * * *", 30-prefetchInterval/60, 60-prefetchInterval/60)
c.AddFunc("24 * * * *", prefetchPeakRequests30)
c.AddFunc("54 * * * *", prefetchPeakRequests60)
c.AddFunc(
"24 * * * *",
func() { rp.prefetchPeakRequests(&rp.peakRequest30) },
)
c.AddFunc(
"54 * * * *",
func() { rp.prefetchPeakRequests(&rp.peakRequest60) },
)
c.Start()
}
func savePeakRequest(cacheDigest string, r *http.Request) {
func (rp *RequestProcessor) savePeakRequest(cacheDigest string, r *http.Request) {
_, min, _ := time.Now().Clock()
if min == 30 {
peakRequest30.Store(cacheDigest, *r)
rp.peakRequest30.Store(cacheDigest, *r)
} else if min == 0 {
peakRequest60.Store(cacheDigest, *r)
rp.peakRequest60.Store(cacheDigest, *r)
}
}
func prefetchRequest(r *http.Request) {
processRequest(r)
func (rp *RequestProcessor) prefetchRequest(r *http.Request) {
rp.ProcessRequest(r)
}
func syncMapLen(sm *sync.Map) int {
@ -53,7 +56,7 @@ func syncMapLen(sm *sync.Map) int {
return count
}
func prefetchPeakRequests(peakRequestMap *sync.Map) {
func (rp *RequestProcessor) prefetchPeakRequests(peakRequestMap *sync.Map) {
peakRequestLen := syncMapLen(peakRequestMap)
if peakRequestLen == 0 {
return
@ -62,18 +65,10 @@ func prefetchPeakRequests(peakRequestMap *sync.Map) {
sleepBetweenRequests := time.Duration(prefetchInterval*1000/peakRequestLen) * time.Millisecond
peakRequestMap.Range(func(key interface{}, value interface{}) bool {
go func(r http.Request) {
prefetchRequest(&r)
rp.prefetchRequest(&r)
}(value.(http.Request))
peakRequestMap.Delete(key)
time.Sleep(sleepBetweenRequests)
return true
})
}
func prefetchPeakRequests30() {
prefetchPeakRequests(&peakRequest30)
}
func prefetchPeakRequests60() {
prefetchPeakRequests(&peakRequest60)
}

View File

@ -8,12 +8,48 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
)
func processRequest(r *http.Request) (*responseWithHeader, error) {
type ResponseWithHeader struct {
InProgress bool // true if the request is being processed
Expires time.Time // expiration time of the cache entry
Body []byte
Header http.Header
StatusCode int // e.g. 200
}
// RequestProcessor handles incoming requests.
type RequestProcessor struct {
peakRequest30 sync.Map
peakRequest60 sync.Map
lruCache *lru.Cache
}
// NewRequestProcessor returns new RequestProcessor.
func NewRequestProcessor() (*RequestProcessor, error) {
lruCache, err := lru.New(lruCacheSize)
if err != nil {
return nil, err
}
return &RequestProcessor{
lruCache: lruCache,
}, nil
}
// Start starts async request processor jobs, such as peak handling.
func (rp *RequestProcessor) Start() {
rp.startPeakHandling()
}
func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader, error) {
var (
response *responseWithHeader
response *ResponseWithHeader
err error
)
@ -29,11 +65,11 @@ func processRequest(r *http.Request) (*responseWithHeader, error) {
foundInCache := false
savePeakRequest(cacheDigest, r)
rp.savePeakRequest(cacheDigest, r)
cacheBody, ok := lruCache.Get(cacheDigest)
cacheBody, ok := rp.lruCache.Get(cacheDigest)
if ok {
cacheEntry := cacheBody.(responseWithHeader)
cacheEntry := cacheBody.(ResponseWithHeader)
// if after all attempts we still have no answer,
// we try to make the query on our own
@ -42,9 +78,9 @@ func processRequest(r *http.Request) (*responseWithHeader, error) {
break
}
time.Sleep(30 * time.Millisecond)
cacheBody, ok = lruCache.Get(cacheDigest)
cacheBody, ok = rp.lruCache.Get(cacheDigest)
if ok && cacheBody != nil {
cacheEntry = cacheBody.(responseWithHeader)
cacheEntry = cacheBody.(ResponseWithHeader)
}
}
if cacheEntry.InProgress {
@ -57,22 +93,22 @@ func processRequest(r *http.Request) (*responseWithHeader, error) {
}
if !foundInCache {
lruCache.Add(cacheDigest, responseWithHeader{InProgress: true})
rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true})
response, err = get(r)
if err != nil {
return nil, err
}
if response.StatusCode == 200 || response.StatusCode == 304 || response.StatusCode == 404 {
lruCache.Add(cacheDigest, *response)
rp.lruCache.Add(cacheDigest, *response)
} else {
log.Printf("REMOVE: %d response for %s from cache\n", response.StatusCode, cacheDigest)
lruCache.Remove(cacheDigest)
rp.lruCache.Remove(cacheDigest)
}
}
return response, nil
}
func get(req *http.Request) (*responseWithHeader, error) {
func get(req *http.Request) (*ResponseWithHeader, error) {
client := &http.Client{}
@ -106,7 +142,7 @@ func get(req *http.Request) (*responseWithHeader, error) {
return nil, err
}
return &responseWithHeader{
return &ResponseWithHeader{
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: body,
@ -147,7 +183,7 @@ func dontCache(req *http.Request) bool {
// proxy_set_header X-Forwarded-Proto $scheme;
//
//
func redirectInsecure(req *http.Request) (*responseWithHeader, bool) {
func redirectInsecure(req *http.Request) (*ResponseWithHeader, bool) {
if isPlainTextAgent(req.Header.Get("User-Agent")) {
return nil, false
}
@ -169,7 +205,7 @@ The document has moved
</BODY></HTML>
`, target))
return &responseWithHeader{
return &ResponseWithHeader{
InProgress: false,
Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second),
Body: body,

View File

@ -9,8 +9,6 @@ import (
"net"
"net/http"
"time"
lru "github.com/hashicorp/golang-lru"
)
const uplinkSrvAddr = "127.0.0.1:9002"
@ -35,24 +33,7 @@ var plainTextAgents = []string{
"xh",
}
var lruCache *lru.Cache
type responseWithHeader struct {
InProgress bool // true if the request is being processed
Expires time.Time // expiration time of the cache entry
Body []byte
Header http.Header
StatusCode int // e.g. 200
}
func init() {
var err error
lruCache, err = lru.New(lruCacheSize)
if err != nil {
panic(err)
}
dialer := &net.Dialer{
Timeout: uplinkTimeout * time.Second,
KeepAlive: uplinkTimeout * time.Second,
@ -62,8 +43,6 @@ func init() {
http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, _ string) (net.Conn, error) {
return dialer.DialContext(ctx, network, uplinkSrvAddr)
}
initPeakHandling()
}
func copyHeader(dst, src http.Header) {
@ -117,6 +96,8 @@ func main() {
Conf.Logging.AccessLog,
time.Duration(Conf.Logging.Interval)*time.Second)
rp *RequestProcessor
// errs is the servers errors channel.
errs chan error = make(chan error, 1)
@ -131,19 +112,28 @@ func main() {
},
logLineStart,
)
err error
)
err := errorsLog.Open()
rp, err = NewRequestProcessor()
if err != nil {
log.Fatalln("log processor initialization:", err)
}
err = errorsLog.Open()
if err != nil {
log.Fatalln("errors log:", err)
}
rp.Start()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if err := logger.Log(r); err != nil {
log.Println(err)
}
// printStat()
response, err := processRequest(r)
response, err := rp.ProcessRequest(r)
if err != nil {
log.Println(err)
return