From 6da093a4028eb5b6ef1d5cc6d0f98f047557d8a1 Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 3 Nov 2019 01:20:49 +0800 Subject: [PATCH 1/2] support bandwith limit for one proxy --- client/proxy/proxy.go | 42 ++++++++++++----- conf/frpc_full.ini | 2 + go.mod | 1 + go.sum | 2 + models/config/proxy.go | 17 +++++-- models/config/types.go | 100 +++++++++++++++++++++++++++++++++++++++++ utils/limit/reader.go | 51 +++++++++++++++++++++ utils/limit/writer.go | 60 +++++++++++++++++++++++++ 8 files changed, 262 insertions(+), 13 deletions(-) create mode 100644 models/config/types.go create mode 100644 utils/limit/reader.go create mode 100644 utils/limit/writer.go diff --git a/client/proxy/proxy.go b/client/proxy/proxy.go index 2da68ce8..aca1b94e 100644 --- a/client/proxy/proxy.go +++ b/client/proxy/proxy.go @@ -30,6 +30,7 @@ import ( "github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/plugin" "github.com/fatedier/frp/models/proto/udp" + "github.com/fatedier/frp/utils/limit" frpNet "github.com/fatedier/frp/utils/net" "github.com/fatedier/frp/utils/xlog" @@ -38,6 +39,7 @@ import ( "github.com/fatedier/golib/pool" fmux "github.com/hashicorp/yamux" pp "github.com/pires/go-proxyproto" + "golang.org/x/time/rate" ) // Proxy defines how to handle work connections for different proxy type. @@ -51,9 +53,16 @@ type Proxy interface { } func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) { + var limiter *rate.Limiter + limitBytes := pxyConf.GetBaseInfo().BandwithLimit.Bytes() + if limitBytes > 0 { + limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes)) + } + baseProxy := BaseProxy{ clientCfg: clientCfg, serverUDPPort: serverUDPPort, + limiter: limiter, xl: xlog.FromContextSafe(ctx), ctx: ctx, } @@ -96,6 +105,7 @@ type BaseProxy struct { closed bool clientCfg config.ClientCommonConf serverUDPPort int + limiter *rate.Limiter mu sync.RWMutex xl *xlog.Logger @@ -127,8 +137,8 @@ func (pxy *TcpProxy) Close() { } func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // HTTP @@ -156,8 +166,8 @@ func (pxy *HttpProxy) Close() { } func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // HTTPS @@ -185,8 +195,8 @@ func (pxy *HttpsProxy) Close() { } func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // STCP @@ -214,8 +224,8 @@ func (pxy *StcpProxy) Close() { } func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn, - []byte(pxy.clientCfg.Token), m) + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, + conn, []byte(pxy.clientCfg.Token), m) } // XTCP @@ -360,7 +370,7 @@ func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { return } - HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, + HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter, muxConn, []byte(pxy.cfg.Sk), m) } @@ -429,6 +439,13 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { // close resources releated with old workConn pxy.Close() + if pxy.limiter != nil { + rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error { + return conn.Close() + }) + conn = frpNet.WrapReadWriteCloserToConn(rwc, conn) + } + pxy.mu.Lock() pxy.workConn = conn pxy.readCh = make(chan *msg.UdpPacket, 1024) @@ -491,13 +508,18 @@ func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) { // Common handler for tcp work connections. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin, - baseInfo *config.BaseProxyConf, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { + baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) { xl := xlog.FromContextSafe(ctx) var ( remote io.ReadWriteCloser err error ) remote = workConn + if limiter != nil { + remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error { + return workConn.Close() + }) + } xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t", baseInfo.UseEncryption, baseInfo.UseCompression) diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 158ff23f..afa07079 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -71,6 +71,8 @@ tls_enable = true type = tcp local_ip = 127.0.0.1 local_port = 22 +# limit bandwith for this proxy, unit is KB and MB +bandwith_limit = 1MB # true or false, if true, messages between frps and frpc will be encrypted, default is false use_encryption = false # if true, message will be compressed diff --git a/go.mod b/go.mod index 81de8abe..a71a97c0 100644 --- a/go.mod +++ b/go.mod @@ -29,4 +29,5 @@ require ( github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 golang.org/x/text v0.3.2 // indirect + golang.org/x/time v0.0.0-20191024005414-555d28b269f0 ) diff --git a/go.sum b/go.sum index b9bdf2a0..26c9004f 100644 --- a/go.sum +++ b/go.sum @@ -46,4 +46,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/models/config/proxy.go b/models/config/proxy.go index 9ab1ef88..2b129cfa 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -125,6 +125,11 @@ type BaseProxyConf struct { // values include "v1", "v2", and "". If the value is "", a protocol // version will be automatically selected. By default, this value is "". ProxyProtocolVersion string `json:"proxy_protocol_version"` + + // BandwithLimit limit the proxy bandwith + // 0 means no limit + BandwithLimit BandwithQuantity `json:"bandwith_limit"` + LocalSvrConf HealthCheckConf } @@ -140,7 +145,8 @@ func (cfg *BaseProxyConf) compare(cmp *BaseProxyConf) bool { cfg.UseCompression != cmp.UseCompression || cfg.Group != cmp.Group || cfg.GroupKey != cmp.GroupKey || - cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion { + cfg.ProxyProtocolVersion != cmp.ProxyProtocolVersion || + cfg.BandwithLimit.Equal(&cmp.BandwithLimit) { return false } if !cfg.LocalSvrConf.compare(&cmp.LocalSvrConf) { @@ -165,6 +171,7 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i var ( tmpStr string ok bool + err error ) cfg.ProxyName = prefix + name cfg.ProxyType = section["type"] @@ -183,11 +190,15 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i cfg.GroupKey = section["group_key"] cfg.ProxyProtocolVersion = section["proxy_protocol_version"] - if err := cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { + if cfg.BandwithLimit, err = NewBandwithQuantity(section["bandwidth_limit"]); err != nil { return err } - if err := cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil { + if err = cfg.LocalSvrConf.UnmarshalFromIni(prefix, name, section); err != nil { + return err + } + + if err = cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil { return err } diff --git a/models/config/types.go b/models/config/types.go new file mode 100644 index 00000000..9f3de661 --- /dev/null +++ b/models/config/types.go @@ -0,0 +1,100 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "errors" + "strconv" + "strings" +) + +const ( + MB = 1024 * 1024 + KB = 1024 +) + +type BandwithQuantity struct { + s string // MB or KB + + i int64 // bytes +} + +func NewBandwithQuantity(s string) (BandwithQuantity, error) { + q := BandwithQuantity{} + err := q.UnmarshalString(s) + if err != nil { + return q, err + } + return q, nil +} + +func (q *BandwithQuantity) Equal(u *BandwithQuantity) bool { + if q == nil && u == nil { + return true + } + if q != nil && u != nil { + return q.i == u.i + } + return false +} + +func (q *BandwithQuantity) String() string { + return q.s +} + +func (q *BandwithQuantity) UnmarshalString(s string) error { + q.s = strings.TrimSpace(s) + if q.s == "" { + return nil + } + + var ( + base int64 + f float64 + err error + ) + if strings.HasSuffix(s, "MB") { + base = MB + s = strings.TrimSuffix(s, "MB") + f, err = strconv.ParseFloat(s, 64) + if err != nil { + return err + } + } else if strings.HasSuffix(s, "KB") { + base = KB + s = strings.TrimSuffix(s, "KB") + f, err = strconv.ParseFloat(s, 64) + if err != nil { + return err + } + } else { + return errors.New("unit not support") + } + + q.i = int64(f * float64(base)) + return nil +} + +func (q *BandwithQuantity) UnmarshalJSON(b []byte) error { + return q.UnmarshalString(string(b)) +} + +func (q *BandwithQuantity) MarshalJSON() ([]byte, error) { + return []byte(q.s), nil +} + +func (q *BandwithQuantity) Bytes() int64 { + return q.i +} diff --git a/utils/limit/reader.go b/utils/limit/reader.go new file mode 100644 index 00000000..efa828f4 --- /dev/null +++ b/utils/limit/reader.go @@ -0,0 +1,51 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package limit + +import ( + "context" + "io" + + "golang.org/x/time/rate" +) + +type Reader struct { + r io.Reader + limiter *rate.Limiter +} + +func NewReader(r io.Reader, limiter *rate.Limiter) *Reader { + return &Reader{ + r: r, + limiter: limiter, + } +} + +func (r *Reader) Read(p []byte) (n int, err error) { + b := r.limiter.Burst() + if b < len(p) { + p = p[:b] + } + n, err = r.r.Read(p) + if err != nil { + return + } + + err = r.limiter.WaitN(context.Background(), n) + if err != nil { + return + } + return +} diff --git a/utils/limit/writer.go b/utils/limit/writer.go new file mode 100644 index 00000000..5256d1e2 --- /dev/null +++ b/utils/limit/writer.go @@ -0,0 +1,60 @@ +// Copyright 2019 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package limit + +import ( + "context" + "io" + + "golang.org/x/time/rate" +) + +type Writer struct { + w io.Writer + limiter *rate.Limiter +} + +func NewWriter(w io.Writer, limiter *rate.Limiter) *Writer { + return &Writer{ + w: w, + limiter: limiter, + } +} + +func (w *Writer) Write(p []byte) (n int, err error) { + var nn int + b := w.limiter.Burst() + for { + end := len(p) + if end == 0 { + break + } + if b < len(p) { + end = b + } + err = w.limiter.WaitN(context.Background(), end) + if err != nil { + return + } + + nn, err = w.w.Write(p[:end]) + n += nn + if err != nil { + return + } + p = p[end:] + } + return +} From 42425d8218b215d8ee941a0364eb72ffd6dd839b Mon Sep 17 00:00:00 2001 From: fatedier Date: Sun, 3 Nov 2019 01:21:47 +0800 Subject: [PATCH 2/2] update vendor files --- vendor/golang.org/x/time/AUTHORS | 3 + vendor/golang.org/x/time/CONTRIBUTORS | 3 + vendor/golang.org/x/time/LICENSE | 27 ++ vendor/golang.org/x/time/PATENTS | 22 ++ vendor/golang.org/x/time/rate/rate.go | 400 ++++++++++++++++++++++++++ vendor/modules.txt | 2 + 6 files changed, 457 insertions(+) create mode 100644 vendor/golang.org/x/time/AUTHORS create mode 100644 vendor/golang.org/x/time/CONTRIBUTORS create mode 100644 vendor/golang.org/x/time/LICENSE create mode 100644 vendor/golang.org/x/time/PATENTS create mode 100644 vendor/golang.org/x/time/rate/rate.go diff --git a/vendor/golang.org/x/time/AUTHORS b/vendor/golang.org/x/time/AUTHORS new file mode 100644 index 00000000..15167cd7 --- /dev/null +++ b/vendor/golang.org/x/time/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/time/CONTRIBUTORS b/vendor/golang.org/x/time/CONTRIBUTORS new file mode 100644 index 00000000..1c4577e9 --- /dev/null +++ b/vendor/golang.org/x/time/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 00000000..6a66aea5 --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 00000000..73309904 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 00000000..563f7042 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,400 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "context" + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + lim.mu.Lock() + burst := lim.burst + limit := lim.limit + lim.mu.Unlock() + + if n > burst && limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait if necessary + delay := r.DelayFrom(now) + if delay == 0 { + return nil + } + t := time.NewTimer(delay) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). +func (lim *Limiter) SetBurst(newBurst int) { + lim.SetBurstAt(time.Now(), newBurst) +} + +// SetBurstAt sets a new burst size for the limiter. +func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.burst = newBurst +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + // Split the integer and fractional parts ourself to minimize rounding errors. + // See golang.org/issues/34861. + sec := float64(d/time.Second) * float64(limit) + nsec := float64(d%time.Second) * float64(limit) + return sec + nsec/1e9 +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 79bbd1c7..079212db 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -83,3 +83,5 @@ golang.org/x/text/secure/bidirule golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm +# golang.org/x/time v0.0.0-20191024005414-555d28b269f0 +golang.org/x/time/rate