add ZREPL_DESTROY_MAX_BATCH_SIZE env var to control max batch destroy size

fixes #508
closes https://github.com/zrepl/zrepl/pull/604
This commit is contained in:
3nprob 2022-05-19 20:32:37 +09:00 committed by Christian Schwarz
parent 53f9bd6d88
commit e4112d888c
2 changed files with 4 additions and 2 deletions

View File

@ -21,6 +21,7 @@ Developers should consult the git commit log or GitHub issue tracker.
* `Feature Wishlist on GitHub <https://github.com/zrepl/zrepl/discussions/547>`_ * `Feature Wishlist on GitHub <https://github.com/zrepl/zrepl/discussions/547>`_
* |feature| Add ``ZREPL_DESTROY_MAX_BATCH_SIZE`` env var (default 0=unlimited).
* |break| |feature| convert Prometheus metric ``zrepl_version_daemon`` to ``zrepl_start_time`` metric * |break| |feature| convert Prometheus metric ``zrepl_version_daemon`` to ``zrepl_start_time`` metric
* The metric still reports the zrepl version in a label. * The metric still reports the zrepl version in a label.
@ -275,7 +276,7 @@ Changes
* |feature| Proper timeout handling for the :ref:`SSH transport <transport-ssh+stdinserver>` * |feature| Proper timeout handling for the :ref:`SSH transport <transport-ssh+stdinserver>`
* |break| Requires Go 1.11 or later. * |break| Requires Go 1.11 or later.
* |break| |break_config|: mappings are no longer supported * |break| |break_config|: mappings are no longer supported
* Receiving sides (``pull`` and ``sink`` job) specify a single ``root_fs``. * Receiving sides (``pull`` and ``sink`` job) specify a single ``root_fs``.

View File

@ -115,9 +115,10 @@ func buildBatches(reqs []*DestroySnapOp) [][]*DestroySnapOp {
// group by fs // group by fs
var perFS [][]*DestroySnapOp var perFS [][]*DestroySnapOp
consumed := 0 consumed := 0
maxBatchSize := envconst.Int("ZREPL_DESTROY_MAX_BATCH_SIZE", 0)
for consumed < len(sorted) { for consumed < len(sorted) {
batchConsumedUntil := consumed batchConsumedUntil := consumed
for ; batchConsumedUntil < len(sorted) && sorted[batchConsumedUntil].Filesystem == sorted[consumed].Filesystem; batchConsumedUntil++ { for ; batchConsumedUntil < len(sorted) && (maxBatchSize < 1 || batchConsumedUntil-consumed < maxBatchSize) && sorted[batchConsumedUntil].Filesystem == sorted[consumed].Filesystem; batchConsumedUntil++ {
} }
perFS = append(perFS, sorted[consumed:batchConsumedUntil]) perFS = append(perFS, sorted[consumed:batchConsumedUntil])
consumed = batchConsumedUntil consumed = batchConsumedUntil