// Copyright 2016 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "log" "sync" "time" "golang.org/x/net/context" "google.golang.org/api/iterator" "google.golang.org/api/support/bundler" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" ) type messageIterator struct { impl interface { next() (*Message, error) stop() } } type pollingMessageIterator struct { // kaTicker controls how often we send an ack deadline extension request. kaTicker *time.Ticker // ackTicker controls how often we acknowledge a batch of messages. ackTicker *time.Ticker ka *keepAlive acker *acker nacker *bundler.Bundler puller *puller // mu ensures that cleanup only happens once, and concurrent Stop // invocations block until cleanup completes. mu sync.Mutex // closed is used to signal that Stop has been called. closed chan struct{} } var useStreamingPull = false // newMessageIterator starts a new messageIterator. Stop must be called on the messageIterator // when it is no longer needed. // subName is the full name of the subscription to pull messages from. // ctx is the context to use for acking messages and extending message deadlines. func newMessageIterator(ctx context.Context, s service, subName string, po *pullOptions) *messageIterator { if !useStreamingPull { return &messageIterator{ impl: newPollingMessageIterator(ctx, s, subName, po), } } sp := s.newStreamingPuller(ctx, subName, int32(po.ackDeadline.Seconds())) err := sp.open() if grpc.Code(err) == codes.Unimplemented { log.Println("pubsub: streaming pull unimplemented; falling back to legacy pull") return &messageIterator{ impl: newPollingMessageIterator(ctx, s, subName, po), } } // TODO(jba): handle other non-nil error? log.Println("using streaming pull") return &messageIterator{ impl: newStreamingMessageIterator(ctx, sp, po), } } func newPollingMessageIterator(ctx context.Context, s service, subName string, po *pullOptions) *pollingMessageIterator { // TODO: make kaTicker frequency more configurable. // (ackDeadline - 5s) is a reasonable default for now, because the minimum ack period is 10s. This gives us 5s grace. keepAlivePeriod := po.ackDeadline - 5*time.Second kaTicker := time.NewTicker(keepAlivePeriod) // Stopped in it.Stop // TODO: make ackTicker more configurable. Something less than // kaTicker is a reasonable default (there's no point extending // messages when they could be acked instead). ackTicker := time.NewTicker(keepAlivePeriod / 2) // Stopped in it.Stop ka := &keepAlive{ s: s, Ctx: ctx, Sub: subName, ExtensionTick: kaTicker.C, Deadline: po.ackDeadline, MaxExtension: po.maxExtension, } ack := &acker{ s: s, Ctx: ctx, Sub: subName, AckTick: ackTicker.C, Notify: ka.Remove, } nacker := bundler.NewBundler("", func(ackIDs interface{}) { // NACK by setting the ack deadline to zero, to make the message // immediately available for redelivery. // // If the RPC fails, nothing we can do about it. In the worst case, the // deadline for these messages will expire and they will still get // redelivered. _ = s.modifyAckDeadline(ctx, subName, 0, ackIDs.([]string)) }) nacker.DelayThreshold = keepAlivePeriod / 10 // nack promptly nacker.BundleCountThreshold = 10 pull := newPuller(s, subName, ctx, po.maxPrefetch, ka.Add, ka.Remove) ka.Start() ack.Start() return &pollingMessageIterator{ kaTicker: kaTicker, ackTicker: ackTicker, ka: ka, acker: ack, nacker: nacker, puller: pull, closed: make(chan struct{}), } } // Next returns the next Message to be processed. The caller must call // Message.Done when finished with it. // Once Stop has been called, calls to Next will return iterator.Done. func (it *messageIterator) Next() (*Message, error) { return it.impl.next() } func (it *pollingMessageIterator) next() (*Message, error) { m, err := it.puller.Next() if err == nil { m.doneFunc = it.done return m, nil } select { // If Stop has been called, we return Done regardless the value of err. case <-it.closed: return nil, iterator.Done default: return nil, err } } // Client code must call Stop on a messageIterator when finished with it. // Stop will block until Done has been called on all Messages that have been // returned by Next, or until the context with which the messageIterator was created // is cancelled or exceeds its deadline. // Stop need only be called once, but may be called multiple times from // multiple goroutines. func (it *messageIterator) Stop() { it.impl.stop() } func (it *pollingMessageIterator) stop() { it.mu.Lock() defer it.mu.Unlock() select { case <-it.closed: // Cleanup has already been performed. return default: } // We close this channel before calling it.puller.Stop to ensure that we // reliably return iterator.Done from Next. close(it.closed) // Stop the puller. Once this completes, no more messages will be added // to it.ka. it.puller.Stop() // Start acking messages as they arrive, ignoring ackTicker. This will // result in it.ka.Stop, below, returning as soon as possible. it.acker.FastMode() // This will block until // (a) it.ka.Ctx is done, or // (b) all messages have been removed from keepAlive. // (b) will happen once all outstanding messages have been either ACKed or NACKed. it.ka.Stop() // There are no more live messages, so kill off the acker. it.acker.Stop() it.nacker.Flush() it.kaTicker.Stop() it.ackTicker.Stop() } func (it *pollingMessageIterator) done(ackID string, ack bool) { if ack { it.acker.Ack(ackID) // There's no need to call it.ka.Remove here, as acker will // call it via its Notify function. } else { it.ka.Remove(ackID) _ = it.nacker.Add(ackID, len(ackID)) // ignore error; this is just an optimization } } type streamingMessageIterator struct { ctx context.Context po *pullOptions sp *streamingPuller kaTicker *time.Ticker // keep-alive (deadline extensions) ackTicker *time.Ticker // message acks nackTicker *time.Ticker // message nacks (more frequent than acks) failed chan struct{} // closed on stream error stopped chan struct{} // closed when Stop is called drained chan struct{} // closed when stopped && no more pending messages msgc chan *Message wg sync.WaitGroup mu sync.Mutex keepAliveDeadlines map[string]time.Time pendingReq *pb.StreamingPullRequest err error // error from stream failure } func newStreamingMessageIterator(ctx context.Context, sp *streamingPuller, po *pullOptions) *streamingMessageIterator { // TODO: make kaTicker frequency more configurable. (ackDeadline - 5s) is a // reasonable default for now, because the minimum ack period is 10s. This // gives us 5s grace. keepAlivePeriod := po.ackDeadline - 5*time.Second kaTicker := time.NewTicker(keepAlivePeriod) // TODO: make ackTicker more configurable. Something less than // kaTicker is a reasonable default (there's no point extending // messages when they could be acked instead). ackTicker := time.NewTicker(keepAlivePeriod / 2) nackTicker := time.NewTicker(keepAlivePeriod / 10) it := &streamingMessageIterator{ ctx: ctx, sp: sp, po: po, kaTicker: kaTicker, ackTicker: ackTicker, nackTicker: nackTicker, failed: make(chan struct{}), stopped: make(chan struct{}), drained: make(chan struct{}), // use maxPrefetch as the channel's buffer size. msgc: make(chan *Message, po.maxPrefetch), keepAliveDeadlines: map[string]time.Time{}, pendingReq: &pb.StreamingPullRequest{}, } it.wg.Add(2) go it.receiver() go it.sender() return it } func (it *streamingMessageIterator) next() (*Message, error) { // If ctx has been cancelled or the iterator is done, return straight // away (even if there are buffered messages available). select { case <-it.ctx.Done(): return nil, it.ctx.Err() case <-it.failed: break case <-it.stopped: break default: // Wait for a message, but also for one of the above conditions. select { case msg := <-it.msgc: // Since active select cases are chosen at random, this can return // nil (from the channel close) even if it.failed or it.stopped is // closed. if msg == nil { break } msg.doneFunc = it.done return msg, nil case <-it.ctx.Done(): return nil, it.ctx.Err() case <-it.failed: break case <-it.stopped: break } } // Here if the iterator is done. it.mu.Lock() defer it.mu.Unlock() return nil, it.err } func (it *streamingMessageIterator) stop() { it.mu.Lock() select { case <-it.stopped: it.mu.Unlock() it.wg.Wait() return default: close(it.stopped) } if it.err == nil { it.err = iterator.Done } // Before reading from the channel, see if we're already drained. it.checkDrained() it.mu.Unlock() // Nack all the pending messages. // Grab the lock separately for each message to allow the receiver // and sender goroutines to make progress. // Why this will eventually terminate: // - If the receiver is not blocked on a stream Recv, then // it will write all the messages it has received to the channel, // then exit, closing the channel. // - If the receiver is blocked, then this loop will eventually // nack all the messages in the channel. Once done is called // on the remaining messages, the iterator will be marked as drained, // which will trigger the sender to terminate. When it does, it // performs a CloseSend on the stream, which will result in the blocked // stream Recv returning. for m := range it.msgc { it.mu.Lock() delete(it.keepAliveDeadlines, m.ackID) it.addDeadlineMod(m.ackID, 0) it.checkDrained() it.mu.Unlock() } it.wg.Wait() } // checkDrained closes the drained channel if the iterator has been stopped and all // pending messages have either been n/acked or expired. // // Called with the lock held. func (it *streamingMessageIterator) checkDrained() { select { case <-it.drained: return default: } select { case <-it.stopped: if len(it.keepAliveDeadlines) == 0 { close(it.drained) } default: } } // Called when a message is acked/nacked. func (it *streamingMessageIterator) done(ackID string, ack bool) { it.mu.Lock() defer it.mu.Unlock() delete(it.keepAliveDeadlines, ackID) if ack { it.pendingReq.AckIds = append(it.pendingReq.AckIds, ackID) } else { it.addDeadlineMod(ackID, 0) // Nack indicated by modifying the deadline to zero. } it.checkDrained() } // addDeadlineMod adds the ack ID to the pending request with the given deadline. // // Called with the lock held. func (it *streamingMessageIterator) addDeadlineMod(ackID string, deadlineSecs int32) { pr := it.pendingReq pr.ModifyDeadlineAckIds = append(pr.ModifyDeadlineAckIds, ackID) pr.ModifyDeadlineSeconds = append(pr.ModifyDeadlineSeconds, deadlineSecs) } // fail is called when a stream method returns a permanent error. func (it *streamingMessageIterator) fail(err error) { it.mu.Lock() if it.err == nil { it.err = err close(it.failed) } it.mu.Unlock() } // receiver runs in a goroutine and handles all receives from the stream. func (it *streamingMessageIterator) receiver() { defer it.wg.Done() defer close(it.msgc) for { // Stop retrieving messages if the context is done, the stream // failed, or the iterator's Stop method was called. select { case <-it.ctx.Done(): return case <-it.failed: return case <-it.stopped: return default: } // Receive messages from stream. This may block indefinitely. msgs, err := it.sp.fetchMessages() // The streamingPuller handles retries, so any error here // is fatal to the iterator. if err != nil { it.fail(err) return } // We received some messages. Remember them so we can // keep them alive. deadline := time.Now().Add(it.po.maxExtension) it.mu.Lock() for _, m := range msgs { it.keepAliveDeadlines[m.ackID] = deadline } it.mu.Unlock() // Deliver the messages to the channel. for _, m := range msgs { select { case <-it.ctx.Done(): return case <-it.failed: return // Don't return if stopped. We want to send the remaining // messages on the channel, where they will be nacked. case it.msgc <- m: } } } } // sender runs in a goroutine and handles all sends to the stream. func (it *streamingMessageIterator) sender() { defer it.wg.Done() defer it.kaTicker.Stop() defer it.ackTicker.Stop() defer it.nackTicker.Stop() defer it.sp.closeSend() done := false for !done { send := false select { case <-it.ctx.Done(): // Context canceled or timed out: stop immediately, without // another RPC. return case <-it.failed: // Stream failed: nothing to do, so stop immediately. return case <-it.drained: // All outstanding messages have been marked done: // nothing left to do except send the final request. it.mu.Lock() send = (len(it.pendingReq.AckIds) > 0 || len(it.pendingReq.ModifyDeadlineAckIds) > 0) done = true case <-it.kaTicker.C: it.mu.Lock() send = it.handleKeepAlives() case <-it.nackTicker.C: it.mu.Lock() send = (len(it.pendingReq.ModifyDeadlineAckIds) > 0) case <-it.ackTicker.C: it.mu.Lock() send = (len(it.pendingReq.AckIds) > 0) } // Lock is held here. if send { req := it.pendingReq it.pendingReq = &pb.StreamingPullRequest{} it.mu.Unlock() err := it.sp.send(req) if err != nil { // The streamingPuller handles retries, so any error here // is fatal to the iterator. it.fail(err) return } } else { it.mu.Unlock() } } } // handleKeepAlives modifies the pending request to include deadline extensions // for live messages. It also purges expired messages. It reports whether // there were any live messages. // // Called with the lock held. func (it *streamingMessageIterator) handleKeepAlives() bool { live, expired := getKeepAliveAckIDs(it.keepAliveDeadlines) for _, e := range expired { delete(it.keepAliveDeadlines, e) } dl := trunc32(int64(it.po.ackDeadline.Seconds())) for _, m := range live { it.addDeadlineMod(m, dl) } it.checkDrained() return len(live) > 0 }