From 762e0fe8f0c7f0d02341d963ce206f986d66c8d4 Mon Sep 17 00:00:00 2001 From: Igor Chubin Date: Sun, 27 Nov 2022 15:51:41 +0100 Subject: [PATCH] Move RequestProcessor to a struct --- cmd/peakHandling.go | 37 +++++++++++-------------- cmd/processRequest.go | 64 +++++++++++++++++++++++++++++++++---------- cmd/srv.go | 36 +++++++++--------------- 3 files changed, 79 insertions(+), 58 deletions(-) diff --git a/cmd/peakHandling.go b/cmd/peakHandling.go index d279564..461f7ff 100644 --- a/cmd/peakHandling.go +++ b/cmd/peakHandling.go @@ -9,28 +9,31 @@ import ( "github.com/robfig/cron" ) -var peakRequest30 sync.Map -var peakRequest60 sync.Map - -func initPeakHandling() { +func (rp *RequestProcessor) startPeakHandling() { c := cron.New() // cronTime := fmt.Sprintf("%d,%d * * * *", 30-prefetchInterval/60, 60-prefetchInterval/60) - c.AddFunc("24 * * * *", prefetchPeakRequests30) - c.AddFunc("54 * * * *", prefetchPeakRequests60) + c.AddFunc( + "24 * * * *", + func() { rp.prefetchPeakRequests(&rp.peakRequest30) }, + ) + c.AddFunc( + "54 * * * *", + func() { rp.prefetchPeakRequests(&rp.peakRequest60) }, + ) c.Start() } -func savePeakRequest(cacheDigest string, r *http.Request) { +func (rp *RequestProcessor) savePeakRequest(cacheDigest string, r *http.Request) { _, min, _ := time.Now().Clock() if min == 30 { - peakRequest30.Store(cacheDigest, *r) + rp.peakRequest30.Store(cacheDigest, *r) } else if min == 0 { - peakRequest60.Store(cacheDigest, *r) + rp.peakRequest60.Store(cacheDigest, *r) } } -func prefetchRequest(r *http.Request) { - processRequest(r) +func (rp *RequestProcessor) prefetchRequest(r *http.Request) { + rp.ProcessRequest(r) } func syncMapLen(sm *sync.Map) int { @@ -53,7 +56,7 @@ func syncMapLen(sm *sync.Map) int { return count } -func prefetchPeakRequests(peakRequestMap *sync.Map) { +func (rp *RequestProcessor) prefetchPeakRequests(peakRequestMap *sync.Map) { peakRequestLen := syncMapLen(peakRequestMap) if peakRequestLen == 0 { return @@ -62,18 +65,10 @@ func prefetchPeakRequests(peakRequestMap *sync.Map) { sleepBetweenRequests := time.Duration(prefetchInterval*1000/peakRequestLen) * time.Millisecond peakRequestMap.Range(func(key interface{}, value interface{}) bool { go func(r http.Request) { - prefetchRequest(&r) + rp.prefetchRequest(&r) }(value.(http.Request)) peakRequestMap.Delete(key) time.Sleep(sleepBetweenRequests) return true }) } - -func prefetchPeakRequests30() { - prefetchPeakRequests(&peakRequest30) -} - -func prefetchPeakRequests60() { - prefetchPeakRequests(&peakRequest60) -} diff --git a/cmd/processRequest.go b/cmd/processRequest.go index 1739fea..da03f34 100644 --- a/cmd/processRequest.go +++ b/cmd/processRequest.go @@ -8,12 +8,48 @@ import ( "net" "net/http" "strings" + "sync" "time" + + lru "github.com/hashicorp/golang-lru" ) -func processRequest(r *http.Request) (*responseWithHeader, error) { +type ResponseWithHeader struct { + InProgress bool // true if the request is being processed + Expires time.Time // expiration time of the cache entry + + Body []byte + Header http.Header + StatusCode int // e.g. 200 +} + +// RequestProcessor handles incoming requests. +type RequestProcessor struct { + peakRequest30 sync.Map + peakRequest60 sync.Map + lruCache *lru.Cache +} + +// NewRequestProcessor returns new RequestProcessor. +func NewRequestProcessor() (*RequestProcessor, error) { + lruCache, err := lru.New(lruCacheSize) + if err != nil { + return nil, err + } + + return &RequestProcessor{ + lruCache: lruCache, + }, nil +} + +// Start starts async request processor jobs, such as peak handling. +func (rp *RequestProcessor) Start() { + rp.startPeakHandling() +} + +func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader, error) { var ( - response *responseWithHeader + response *ResponseWithHeader err error ) @@ -29,11 +65,11 @@ func processRequest(r *http.Request) (*responseWithHeader, error) { foundInCache := false - savePeakRequest(cacheDigest, r) + rp.savePeakRequest(cacheDigest, r) - cacheBody, ok := lruCache.Get(cacheDigest) + cacheBody, ok := rp.lruCache.Get(cacheDigest) if ok { - cacheEntry := cacheBody.(responseWithHeader) + cacheEntry := cacheBody.(ResponseWithHeader) // if after all attempts we still have no answer, // we try to make the query on our own @@ -42,9 +78,9 @@ func processRequest(r *http.Request) (*responseWithHeader, error) { break } time.Sleep(30 * time.Millisecond) - cacheBody, ok = lruCache.Get(cacheDigest) + cacheBody, ok = rp.lruCache.Get(cacheDigest) if ok && cacheBody != nil { - cacheEntry = cacheBody.(responseWithHeader) + cacheEntry = cacheBody.(ResponseWithHeader) } } if cacheEntry.InProgress { @@ -57,22 +93,22 @@ func processRequest(r *http.Request) (*responseWithHeader, error) { } if !foundInCache { - lruCache.Add(cacheDigest, responseWithHeader{InProgress: true}) + rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true}) response, err = get(r) if err != nil { return nil, err } if response.StatusCode == 200 || response.StatusCode == 304 || response.StatusCode == 404 { - lruCache.Add(cacheDigest, *response) + rp.lruCache.Add(cacheDigest, *response) } else { log.Printf("REMOVE: %d response for %s from cache\n", response.StatusCode, cacheDigest) - lruCache.Remove(cacheDigest) + rp.lruCache.Remove(cacheDigest) } } return response, nil } -func get(req *http.Request) (*responseWithHeader, error) { +func get(req *http.Request) (*ResponseWithHeader, error) { client := &http.Client{} @@ -106,7 +142,7 @@ func get(req *http.Request) (*responseWithHeader, error) { return nil, err } - return &responseWithHeader{ + return &ResponseWithHeader{ InProgress: false, Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), Body: body, @@ -147,7 +183,7 @@ func dontCache(req *http.Request) bool { // proxy_set_header X-Forwarded-Proto $scheme; // // -func redirectInsecure(req *http.Request) (*responseWithHeader, bool) { +func redirectInsecure(req *http.Request) (*ResponseWithHeader, bool) { if isPlainTextAgent(req.Header.Get("User-Agent")) { return nil, false } @@ -169,7 +205,7 @@ The document has moved `, target)) - return &responseWithHeader{ + return &ResponseWithHeader{ InProgress: false, Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), Body: body, diff --git a/cmd/srv.go b/cmd/srv.go index 0b10e2d..e1d0955 100644 --- a/cmd/srv.go +++ b/cmd/srv.go @@ -9,8 +9,6 @@ import ( "net" "net/http" "time" - - lru "github.com/hashicorp/golang-lru" ) const uplinkSrvAddr = "127.0.0.1:9002" @@ -35,24 +33,7 @@ var plainTextAgents = []string{ "xh", } -var lruCache *lru.Cache - -type responseWithHeader struct { - InProgress bool // true if the request is being processed - Expires time.Time // expiration time of the cache entry - - Body []byte - Header http.Header - StatusCode int // e.g. 200 -} - func init() { - var err error - lruCache, err = lru.New(lruCacheSize) - if err != nil { - panic(err) - } - dialer := &net.Dialer{ Timeout: uplinkTimeout * time.Second, KeepAlive: uplinkTimeout * time.Second, @@ -62,8 +43,6 @@ func init() { http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, _ string) (net.Conn, error) { return dialer.DialContext(ctx, network, uplinkSrvAddr) } - - initPeakHandling() } func copyHeader(dst, src http.Header) { @@ -117,6 +96,8 @@ func main() { Conf.Logging.AccessLog, time.Duration(Conf.Logging.Interval)*time.Second) + rp *RequestProcessor + // errs is the servers errors channel. errs chan error = make(chan error, 1) @@ -131,19 +112,28 @@ func main() { }, logLineStart, ) + + err error ) - err := errorsLog.Open() + rp, err = NewRequestProcessor() + if err != nil { + log.Fatalln("log processor initialization:", err) + } + + err = errorsLog.Open() if err != nil { log.Fatalln("errors log:", err) } + rp.Start() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if err := logger.Log(r); err != nil { log.Println(err) } // printStat() - response, err := processRequest(r) + response, err := rp.ProcessRequest(r) if err != nil { log.Println(err) return