package controller

import (
	"context"
	"fmt"
	"github.com/openziti/zrok/controller/store"
	"github.com/sirupsen/logrus"
	"strconv"
)

func sparkDataForEnvironments(envs []*store.Environment) (rx, tx map[int][]int64, err error) {
	rx = make(map[int][]int64)
	tx = make(map[int][]int64)
	if len(envs) > 0 {
		qapi := idb.QueryAPI(cfg.Metrics.Influx.Org)

		envFilter := "|> filter(fn: (r) =>"
		for i, env := range envs {
			if i > 0 {
				envFilter += " or"
			}
			envFilter += fmt.Sprintf(" r[\"envId\"] == \"%d\"", env.Id)
		}
		envFilter += ")"
		query := fmt.Sprintf("from(bucket: \"%v\")\n", cfg.Metrics.Influx.Bucket) +
			"|> range(start: -5m)\n" +
			"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
			"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
			"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
			envFilter +
			"|> drop(columns: [\"share\", \"acctId\"])\n" +
			"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"

		result, err := qapi.Query(context.Background(), query)
		if err != nil {
			return nil, nil, err
		}

		for result.Next() {
			envIdS := result.Record().ValueByKey("envId").(string)
			envId, err := strconv.ParseInt(envIdS, 10, 32)
			if err != nil {
				logrus.Errorf("error parsing '%v': %v", envIdS, err)
				continue
			}
			switch result.Record().Field() {
			case "rx":
				rxV := int64(0)
				if v, ok := result.Record().Value().(int64); ok {
					rxV = v
				}
				rxData := append(rx[int(envId)], rxV)
				rx[int(envId)] = rxData

			case "tx":
				txV := int64(0)
				if v, ok := result.Record().Value().(int64); ok {
					txV = v
				}
				txData := append(tx[int(envId)], txV)
				tx[int(envId)] = txData
			}
		}
	}
	return rx, tx, nil
}

func sparkDataForShares(shrs []*store.Share) (rx, tx map[string][]int64, err error) {
	rx = make(map[string][]int64)
	tx = make(map[string][]int64)
	if len(shrs) > 0 {
		qapi := idb.QueryAPI(cfg.Metrics.Influx.Org)

		shrFilter := "|> filter(fn: (r) =>"
		for i, shr := range shrs {
			if i > 0 {
				shrFilter += " or"
			}
			shrFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
		}
		shrFilter += ")"
		query := fmt.Sprintf("from(bucket: \"%v\")\n", cfg.Metrics.Influx.Bucket) +
			"|> range(start: -5m)\n" +
			"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
			"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
			"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
			shrFilter +
			"|> aggregateWindow(every: 10s, fn: sum, createEmpty: true)\n"

		result, err := qapi.Query(context.Background(), query)
		if err != nil {
			return nil, nil, err
		}

		for result.Next() {
			shrToken := result.Record().ValueByKey("share").(string)
			switch result.Record().Field() {
			case "rx":
				rxV := int64(0)
				if v, ok := result.Record().Value().(int64); ok {
					rxV = v
				}
				rxData := append(rx[shrToken], rxV)
				rx[shrToken] = rxData

			case "tx":
				txV := int64(0)
				if v, ok := result.Record().Value().(int64); ok {
					txV = v
				}
				txData := append(tx[shrToken], txV)
				tx[shrToken] = txData
			}
		}
	}
	return rx, tx, nil
}