mirror of
https://github.com/rclone/rclone.git
synced 2024-12-27 09:28:58 +01:00
431 lines
12 KiB
Go
431 lines
12 KiB
Go
// Copyright 2015 Google Inc. All Rights Reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package bigquery
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"cloud.google.com/go/internal"
|
|
gax "github.com/googleapis/gax-go"
|
|
"golang.org/x/net/context"
|
|
bq "google.golang.org/api/bigquery/v2"
|
|
"google.golang.org/api/iterator"
|
|
)
|
|
|
|
// A Job represents an operation which has been submitted to BigQuery for processing.
|
|
type Job struct {
|
|
c *Client
|
|
projectID string
|
|
jobID string
|
|
|
|
isQuery bool
|
|
destinationTable *bq.TableReference // table to read query results from
|
|
}
|
|
|
|
// JobFromID creates a Job which refers to an existing BigQuery job. The job
|
|
// need not have been created by this package. For example, the job may have
|
|
// been created in the BigQuery console.
|
|
func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
|
|
job, err := c.service.getJob(ctx, c.projectID, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
job.c = c
|
|
return job, nil
|
|
}
|
|
|
|
// ID returns the job's ID.
|
|
func (j *Job) ID() string {
|
|
return j.jobID
|
|
}
|
|
|
|
// State is one of a sequence of states that a Job progresses through as it is processed.
|
|
type State int
|
|
|
|
const (
|
|
StateUnspecified State = iota // used only as a default in JobIterator
|
|
Pending
|
|
Running
|
|
Done
|
|
)
|
|
|
|
// JobStatus contains the current State of a job, and errors encountered while processing that job.
|
|
type JobStatus struct {
|
|
State State
|
|
|
|
err error
|
|
|
|
// All errors encountered during the running of the job.
|
|
// Not all Errors are fatal, so errors here do not necessarily mean that the job has completed or was unsuccessful.
|
|
Errors []*Error
|
|
|
|
// Statistics about the job.
|
|
Statistics *JobStatistics
|
|
}
|
|
|
|
// createJobRef creates a JobReference.
|
|
// projectID must be non-empty.
|
|
func createJobRef(jobID string, addJobIDSuffix bool, projectID string) *bq.JobReference {
|
|
if jobID == "" {
|
|
jobID = randomJobIDFn()
|
|
} else if addJobIDSuffix {
|
|
jobID += "-" + randomJobIDFn()
|
|
}
|
|
// We don't check whether projectID is empty; the server will return an
|
|
// error when it encounters the resulting JobReference.
|
|
return &bq.JobReference{
|
|
JobId: jobID,
|
|
ProjectId: projectID,
|
|
}
|
|
}
|
|
|
|
const alphanum = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
|
|
|
|
var (
|
|
rngMu sync.Mutex
|
|
rng = rand.New(rand.NewSource(time.Now().UnixNano() ^ int64(os.Getpid())))
|
|
)
|
|
|
|
// For testing.
|
|
var randomJobIDFn = randomJobID
|
|
|
|
func randomJobID() string {
|
|
// As of August 2017, the BigQuery service uses 27 alphanumeric characters for suffixes.
|
|
var b [27]byte
|
|
rngMu.Lock()
|
|
for i := 0; i < len(b); i++ {
|
|
b[i] = alphanum[rng.Intn(len(alphanum))]
|
|
}
|
|
rngMu.Unlock()
|
|
return string(b[:])
|
|
}
|
|
|
|
// Done reports whether the job has completed.
|
|
// After Done returns true, the Err method will return an error if the job completed unsuccesfully.
|
|
func (s *JobStatus) Done() bool {
|
|
return s.State == Done
|
|
}
|
|
|
|
// Err returns the error that caused the job to complete unsuccesfully (if any).
|
|
func (s *JobStatus) Err() error {
|
|
return s.err
|
|
}
|
|
|
|
// Fill in the client field of Tables in the statistics.
|
|
func (s *JobStatus) setClient(c *Client) {
|
|
if s.Statistics == nil {
|
|
return
|
|
}
|
|
if qs, ok := s.Statistics.Details.(*QueryStatistics); ok {
|
|
for _, t := range qs.ReferencedTables {
|
|
t.c = c
|
|
}
|
|
}
|
|
}
|
|
|
|
// Status returns the current status of the job. It fails if the Status could not be determined.
|
|
func (j *Job) Status(ctx context.Context) (*JobStatus, error) {
|
|
js, err := j.c.service.jobStatus(ctx, j.projectID, j.jobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
js.setClient(j.c)
|
|
return js, nil
|
|
}
|
|
|
|
// Cancel requests that a job be cancelled. This method returns without waiting for
|
|
// cancellation to take effect. To check whether the job has terminated, use Job.Status.
|
|
// Cancelled jobs may still incur costs.
|
|
func (j *Job) Cancel(ctx context.Context) error {
|
|
return j.c.service.jobCancel(ctx, j.projectID, j.jobID)
|
|
}
|
|
|
|
// Wait blocks until the job or the context is done. It returns the final status
|
|
// of the job.
|
|
// If an error occurs while retrieving the status, Wait returns that error. But
|
|
// Wait returns nil if the status was retrieved successfully, even if
|
|
// status.Err() != nil. So callers must check both errors. See the example.
|
|
func (j *Job) Wait(ctx context.Context) (*JobStatus, error) {
|
|
if j.isQuery {
|
|
// We can avoid polling for query jobs.
|
|
if _, err := j.c.service.waitForQuery(ctx, j.projectID, j.jobID); err != nil {
|
|
return nil, err
|
|
}
|
|
// Note: extra RPC even if you just want to wait for the query to finish.
|
|
js, err := j.Status(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return js, nil
|
|
}
|
|
// Non-query jobs must poll.
|
|
var js *JobStatus
|
|
err := internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
|
|
js, err = j.Status(ctx)
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
if js.Done() {
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return js, nil
|
|
}
|
|
|
|
// Read fetches the results of a query job.
|
|
// If j is not a query job, Read returns an error.
|
|
func (j *Job) Read(ctx context.Context) (*RowIterator, error) {
|
|
if !j.isQuery {
|
|
return nil, errors.New("bigquery: cannot read from a non-query job")
|
|
}
|
|
var projectID string
|
|
if j.destinationTable != nil {
|
|
projectID = j.destinationTable.ProjectId
|
|
} else {
|
|
projectID = j.c.projectID
|
|
}
|
|
|
|
schema, err := j.c.service.waitForQuery(ctx, projectID, j.jobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// The destination table should only be nil if there was a query error.
|
|
if j.destinationTable == nil {
|
|
return nil, errors.New("bigquery: query job missing destination table")
|
|
}
|
|
return newRowIterator(ctx, j.c.service, &readTableConf{
|
|
projectID: j.destinationTable.ProjectId,
|
|
datasetID: j.destinationTable.DatasetId,
|
|
tableID: j.destinationTable.TableId,
|
|
schema: schema,
|
|
}), nil
|
|
}
|
|
|
|
// JobStatistics contains statistics about a job.
|
|
type JobStatistics struct {
|
|
CreationTime time.Time
|
|
StartTime time.Time
|
|
EndTime time.Time
|
|
TotalBytesProcessed int64
|
|
|
|
Details Statistics
|
|
}
|
|
|
|
// Statistics is one of ExtractStatistics, LoadStatistics or QueryStatistics.
|
|
type Statistics interface {
|
|
implementsStatistics()
|
|
}
|
|
|
|
// ExtractStatistics contains statistics about an extract job.
|
|
type ExtractStatistics struct {
|
|
// The number of files per destination URI or URI pattern specified in the
|
|
// extract configuration. These values will be in the same order as the
|
|
// URIs specified in the 'destinationUris' field.
|
|
DestinationURIFileCounts []int64
|
|
}
|
|
|
|
// LoadStatistics contains statistics about a load job.
|
|
type LoadStatistics struct {
|
|
// The number of bytes of source data in a load job.
|
|
InputFileBytes int64
|
|
|
|
// The number of source files in a load job.
|
|
InputFiles int64
|
|
|
|
// Size of the loaded data in bytes. Note that while a load job is in the
|
|
// running state, this value may change.
|
|
OutputBytes int64
|
|
|
|
// The number of rows imported in a load job. Note that while an import job is
|
|
// in the running state, this value may change.
|
|
OutputRows int64
|
|
}
|
|
|
|
// QueryStatistics contains statistics about a query job.
|
|
type QueryStatistics struct {
|
|
// Billing tier for the job.
|
|
BillingTier int64
|
|
|
|
// Whether the query result was fetched from the query cache.
|
|
CacheHit bool
|
|
|
|
// The type of query statement, if valid.
|
|
StatementType string
|
|
|
|
// Total bytes billed for the job.
|
|
TotalBytesBilled int64
|
|
|
|
// Total bytes processed for the job.
|
|
TotalBytesProcessed int64
|
|
|
|
// Describes execution plan for the query.
|
|
QueryPlan []*ExplainQueryStage
|
|
|
|
// The number of rows affected by a DML statement. Present only for DML
|
|
// statements INSERT, UPDATE or DELETE.
|
|
NumDMLAffectedRows int64
|
|
|
|
// ReferencedTables: [Output-only, Experimental] Referenced tables for
|
|
// the job. Queries that reference more than 50 tables will not have a
|
|
// complete list.
|
|
ReferencedTables []*Table
|
|
|
|
// The schema of the results. Present only for successful dry run of
|
|
// non-legacy SQL queries.
|
|
Schema Schema
|
|
|
|
// Standard SQL: list of undeclared query parameter names detected during a
|
|
// dry run validation.
|
|
UndeclaredQueryParameterNames []string
|
|
}
|
|
|
|
// ExplainQueryStage describes one stage of a query.
|
|
type ExplainQueryStage struct {
|
|
// Relative amount of the total time the average shard spent on CPU-bound tasks.
|
|
ComputeRatioAvg float64
|
|
|
|
// Relative amount of the total time the slowest shard spent on CPU-bound tasks.
|
|
ComputeRatioMax float64
|
|
|
|
// Unique ID for stage within plan.
|
|
ID int64
|
|
|
|
// Human-readable name for stage.
|
|
Name string
|
|
|
|
// Relative amount of the total time the average shard spent reading input.
|
|
ReadRatioAvg float64
|
|
|
|
// Relative amount of the total time the slowest shard spent reading input.
|
|
ReadRatioMax float64
|
|
|
|
// Number of records read into the stage.
|
|
RecordsRead int64
|
|
|
|
// Number of records written by the stage.
|
|
RecordsWritten int64
|
|
|
|
// Current status for the stage.
|
|
Status string
|
|
|
|
// List of operations within the stage in dependency order (approximately
|
|
// chronological).
|
|
Steps []*ExplainQueryStep
|
|
|
|
// Relative amount of the total time the average shard spent waiting to be scheduled.
|
|
WaitRatioAvg float64
|
|
|
|
// Relative amount of the total time the slowest shard spent waiting to be scheduled.
|
|
WaitRatioMax float64
|
|
|
|
// Relative amount of the total time the average shard spent on writing output.
|
|
WriteRatioAvg float64
|
|
|
|
// Relative amount of the total time the slowest shard spent on writing output.
|
|
WriteRatioMax float64
|
|
}
|
|
|
|
// ExplainQueryStep describes one step of a query stage.
|
|
type ExplainQueryStep struct {
|
|
// Machine-readable operation type.
|
|
Kind string
|
|
|
|
// Human-readable stage descriptions.
|
|
Substeps []string
|
|
}
|
|
|
|
func (*ExtractStatistics) implementsStatistics() {}
|
|
func (*LoadStatistics) implementsStatistics() {}
|
|
func (*QueryStatistics) implementsStatistics() {}
|
|
|
|
// Jobs lists jobs within a project.
|
|
func (c *Client) Jobs(ctx context.Context) *JobIterator {
|
|
it := &JobIterator{
|
|
ctx: ctx,
|
|
c: c,
|
|
ProjectID: c.projectID,
|
|
}
|
|
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
|
|
it.fetch,
|
|
func() int { return len(it.items) },
|
|
func() interface{} { b := it.items; it.items = nil; return b })
|
|
return it
|
|
}
|
|
|
|
// A JobInfo consists of a Job and a JobStatus.
|
|
type JobInfo struct {
|
|
Job *Job
|
|
Status *JobStatus
|
|
}
|
|
|
|
// JobIterator iterates over jobs in a project.
|
|
type JobIterator struct {
|
|
ProjectID string // Project ID of the jobs to list. Default is the client's project.
|
|
AllUsers bool // Whether to list jobs owned by all users in the project, or just the current caller.
|
|
State State // List only jobs in the given state. Defaults to all states.
|
|
|
|
ctx context.Context
|
|
c *Client
|
|
pageInfo *iterator.PageInfo
|
|
nextFunc func() error
|
|
items []JobInfo
|
|
}
|
|
|
|
func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
|
|
|
|
func (it *JobIterator) Next() (JobInfo, error) {
|
|
if err := it.nextFunc(); err != nil {
|
|
return JobInfo{}, err
|
|
}
|
|
item := it.items[0]
|
|
it.items = it.items[1:]
|
|
return item, nil
|
|
}
|
|
|
|
func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
|
|
var st string
|
|
switch it.State {
|
|
case StateUnspecified:
|
|
st = ""
|
|
case Pending:
|
|
st = "pending"
|
|
case Running:
|
|
st = "running"
|
|
case Done:
|
|
st = "done"
|
|
default:
|
|
return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
|
|
}
|
|
jobInfos, nextPageToken, err := it.c.service.listJobs(it.ctx, it.ProjectID, pageSize, pageToken, it.AllUsers, st)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, ji := range jobInfos {
|
|
ji.Job.c = it.c
|
|
ji.Status.setClient(it.c)
|
|
it.items = append(it.items, ji)
|
|
}
|
|
return nextPageToken, nil
|
|
}
|