diff --git a/internal/config/config.go b/internal/config/config.go index 23c90d1..f96df4a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -50,8 +50,17 @@ type Server struct { // Uplink configuration. type Uplink struct { - // Address contains address of the uplink server in form IP:PORT. - Address string `yaml:"address,omitempty"` + // Address1 contains address of the uplink server in form IP:PORT + // for format=j1 queries. + Address1 string `yaml:"address1,omitempty"` + + // Address2 contains address of the uplink server in form IP:PORT + // for format=* queries. + Address2 string `yaml:"address2,omitempty"` + + // Address3 contains address of the uplink server in form IP:PORT + // for all other queries. + Address3 string `yaml:"address3,omitempty"` // Timeout for upstream queries. Timeout int `yaml:"timeout,omitempty"` @@ -85,7 +94,7 @@ type Geo struct { LocationCacheType types.CacheType `yaml:"locationCacheType,omitempty"` - Nominatim []Nominatim + Nominatim []Nominatim `yaml:"nominatim"` } type Nominatim struct { @@ -140,7 +149,9 @@ func Default() *Config { TLSKeyFile: "/wttr.in/etc/privkey.pem", }, Uplink{ - Address: "127.0.0.1:9002", + Address1: "127.0.0.1:9002", + Address2: "127.0.0.1:9002", + Address3: "127.0.0.1:9002", Timeout: 30, PrefetchInterval: 300, }, diff --git a/internal/geo/ip/ip.go b/internal/geo/ip/ip.go index c494bd4..c098f5d 100644 --- a/internal/geo/ip/ip.go +++ b/internal/geo/ip/ip.go @@ -10,12 +10,13 @@ import ( "strconv" "strings" + "github.com/samonzeweb/godb" + "github.com/samonzeweb/godb/adapters/sqlite" + "github.com/chubin/wttr.in/internal/config" "github.com/chubin/wttr.in/internal/routing" "github.com/chubin/wttr.in/internal/types" "github.com/chubin/wttr.in/internal/util" - "github.com/samonzeweb/godb" - "github.com/samonzeweb/godb/adapters/sqlite" ) // Address information. diff --git a/internal/logging/logging.go b/internal/logging/logging.go index 992c479..467b4f1 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -56,6 +56,11 @@ func (rl *RequestLogger) Log(r *http.Request) error { le.Proto = "https" } + // Do not log 127.0.0.1 connections + if le.IP == "127.0.0.1" { + return nil + } + rl.m.Lock() rl.buf[le]++ rl.m.Unlock() diff --git a/internal/processor/j1.go b/internal/processor/j1.go new file mode 100644 index 0000000..f6afce7 --- /dev/null +++ b/internal/processor/j1.go @@ -0,0 +1,89 @@ +package processor + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + "time" +) + +func getAny(req *http.Request, tr1, tr2, tr3 *http.Transport) (*ResponseWithHeader, error) { + uri := strings.ReplaceAll(req.URL.RequestURI(), "%", "%25") + + u, err := url.Parse(uri) + if err != nil { + return nil, err + } + + format := u.Query().Get("format") + + if format == "j1" { + return getJ1(req, tr1) + } else if format != "" { + return getFormat(req, tr2) + } + + // log.Println(req.URL.Query()) + // log.Println() + + return getDefault(req, tr3) +} + +func getJ1(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) { + return getUpstream(req, transport) +} + +func getFormat(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) { + return getUpstream(req, transport) +} + +func getDefault(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) { + return getUpstream(req, transport) +} + +func getUpstream(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) { + client := &http.Client{ + Transport: transport, + } + + queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI) + + proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body) + if err != nil { + return nil, err + } + + // proxyReq.Header.Set("Host", req.Host) + // proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr) + + for header, values := range req.Header { + for _, value := range values { + proxyReq.Header.Add(header, value) + } + } + + if proxyReq.Header.Get("X-Forwarded-For") == "" { + proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr)) + } + + res, err := client.Do(proxyReq) + if err != nil { + return nil, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + return &ResponseWithHeader{ + InProgress: false, + Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), + Body: body, + Header: res.Header, + StatusCode: res.StatusCode, + }, nil +} diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 613e5b8..44ff69e 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -3,7 +3,6 @@ package processor import ( "context" "fmt" - "io/ioutil" "log" "math/rand" "net" @@ -52,15 +51,17 @@ type ResponseWithHeader struct { // RequestProcessor handles incoming requests. type RequestProcessor struct { - peakRequest30 sync.Map - peakRequest60 sync.Map - lruCache *lru.Cache - stats *stats.Stats - router routing.Router - upstreamTransport *http.Transport - config *config.Config - geoIPCache *geoip.Cache - geoLocation *geoloc.Cache + peakRequest30 sync.Map + peakRequest60 sync.Map + lruCache *lru.Cache + stats *stats.Stats + router routing.Router + upstreamTransport1 *http.Transport + upstreamTransport2 *http.Transport + upstreamTransport3 *http.Transport + config *config.Config + geoIPCache *geoip.Cache + geoLocation *geoloc.Cache } // NewRequestProcessor returns new RequestProcessor. @@ -76,9 +77,19 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) { DualStack: true, } - transport := &http.Transport{ + transport1 := &http.Transport{ DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { - return dialer.DialContext(ctx, network, config.Uplink.Address) + return dialer.DialContext(ctx, network, config.Uplink.Address1) + }, + } + transport2 := &http.Transport{ + DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { + return dialer.DialContext(ctx, network, config.Uplink.Address2) + }, + } + transport3 := &http.Transport{ + DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { + return dialer.DialContext(ctx, network, config.Uplink.Address3) }, } @@ -93,12 +104,14 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) { } rp := &RequestProcessor{ - lruCache: lruCache, - stats: stats.New(), - upstreamTransport: transport, - config: config, - geoIPCache: geoCache, - geoLocation: geoLocation, + lruCache: lruCache, + stats: stats.New(), + upstreamTransport1: transport1, + upstreamTransport2: transport2, + upstreamTransport3: transport3, + config: config, + geoIPCache: geoCache, + geoLocation: geoLocation, } // Initialize routes. @@ -142,7 +155,7 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader if dontCache(r) { rp.stats.Inc("uncached") - return get(r, rp.upstreamTransport) + return getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3) } // processing cached request @@ -173,8 +186,9 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi return nil } - // if after all attempts we still have no answer, - // we try to make the query on our own + // If after all attempts we still have no answer, + // respond with an error message. + // (WAS: we try to make the query on our own) for attempts := 0; attempts < 300; attempts++ { if !ok || !cacheEntry.InProgress { break @@ -187,7 +201,13 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi } } if cacheEntry.InProgress { - log.Printf("TIMEOUT: %s\n", cacheDigest) + // log.Printf("TIMEOUT: %s\n", cacheDigest) + return &ResponseWithHeader{ + InProgress: false, + Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), + Body: []byte("This query is already being processed"), + StatusCode: 200, + } } if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) { rp.stats.Inc("cache1") @@ -207,6 +227,9 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi err error ) + // Indicate, that the request is being handled. + rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true}) + // Response was not found in cache. // Starting real handling. format := r.URL.Query().Get("format") @@ -223,10 +246,7 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi rp.stats.Inc("geoip") } - // Indicate, that the request is being handled. - rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true}) - - response, err = get(r, rp.upstreamTransport) + response, err = getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3) if err != nil { return nil, err } @@ -240,51 +260,6 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi return response, nil } -func get(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) { - client := &http.Client{ - Transport: transport, - } - - queryURL := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI) - - proxyReq, err := http.NewRequest(req.Method, queryURL, req.Body) - if err != nil { - return nil, err - } - - // proxyReq.Header.Set("Host", req.Host) - // proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr) - - for header, values := range req.Header { - for _, value := range values { - proxyReq.Header.Add(header, value) - } - } - - if proxyReq.Header.Get("X-Forwarded-For") == "" { - proxyReq.Header.Set("X-Forwarded-For", ipFromAddr(req.RemoteAddr)) - } - - res, err := client.Do(proxyReq) - if err != nil { - return nil, err - } - defer res.Body.Close() - - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, err - } - - return &ResponseWithHeader{ - InProgress: false, - Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), - Body: body, - Header: res.Header, - StatusCode: res.StatusCode, - }, nil -} - // getCacheDigest is an implementation of the cache.get_signature of original wttr.in. func getCacheDigest(req *http.Request) string { userAgent := req.Header.Get("User-Agent") diff --git a/lib/location.py b/lib/location.py index 77207bd..7224d77 100644 --- a/lib/location.py +++ b/lib/location.py @@ -102,7 +102,7 @@ def _geolocator(location): if random.random() < 0: geo = requests.get('%s/%s' % (GEOLOCATOR_SERVICE, location)).text else: - geo = requests.get("http://127.0.0.1:8083/:geo-location?location=%s" % location).text + geo = requests.get("http://127.0.0.1:8085/:geo-location?location=%s" % location).text except requests.exceptions.ConnectionError as exception: print("ERROR: %s" % exception) return None @@ -152,7 +152,7 @@ def _ipcache(ip_addr): """ ## Use Geo IP service when available - r = requests.get("http://127.0.0.1:8083/:geo-ip-get?ip=%s" % ip_addr) + r = requests.get("http://127.0.0.1:8085/:geo-ip-get?ip=%s" % ip_addr) if r.status_code == 200 and ";" in r.text: _, country, region, city, *_ = r.text.split(';') return city, region, country