egroupware/api/js/webodf/collab/backend/jsglobal/OperationRouter.js

347 lines
12 KiB
JavaScript

/**
* Copyright (C) 2013 KO GmbH <copyright@kogmbh.com>
*
* @licstart
* This file is part of WebODF.
*
* WebODF is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License (GNU AGPL)
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* WebODF is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with WebODF. If not, see <http://www.gnu.org/licenses/>.
* @licend
*
* @source: http://www.webodf.org/
* @source: https://github.com/kogmbh/WebODF/
*/
/*global define, runtime, core, ops*/
webodfModule.define("webodf/editor/backend/jsglobal/OperationRouter", [], function () {
"use strict";
var /**@const @type {!string}*/
EVENT_BEFORESAVETOFILE = "beforeSaveToFile",
/**@const @type {!string}*/
EVENT_SAVEDTOFILE = "savedToFile",
/**@const @type {!string}*/
EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED = "hasLocalUnsyncedOperationsChanged",
/**@const @type {!string}*/
EVENT_HASSESSIONHOSTCONNECTIONCHANGED = "hasSessionHostConnectionChanged";
runtime.loadClass("ops.OperationTransformer");
runtime.loadClass("core.EventNotifier");
runtime.loadClass("core.Task");
/**
* @constructor
* @implements ops.OperationRouter
*/
return function JsGlobalOperationRouter(sessionId, memberId, server, errorCallback) {
var events = new core.EventNotifier([
ops.OperationRouter.signalProcessingBatchStart,
ops.OperationRouter.signalProcessingBatchEnd,
EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED,
EVENT_HASSESSIONHOSTCONNECTIONCHANGED,
EVENT_BEFORESAVETOFILE,
EVENT_SAVEDTOFILE
]),
/**@type{!ops.OperationFactory}*/
operationFactory,
playbackFunction,
operationTransformer = new ops.OperationTransformer(),
/**@type{number}*/
groupIdentifier = 0,
uploadingOpSpecCount,
syncTask,
unsyncedClientOpspecQueue = [],
hasLocalUnsyncedOps = false,
hasSessionHostConnection = true,
hasUnresolvableConflict = false,
syncInProgress = false,
syncAttemptCompleteCallbacks = [],
successfulSyncCallbacks = [],
destroyed = false;
/**
* @return {undefined}
*/
function updateUnsyncedOps() {
var hasLocalUnsyncedOpsNow = (unsyncedClientOpspecQueue.length > 0);
// no change?
if (hasLocalUnsyncedOps === hasLocalUnsyncedOpsNow) {
return;
}
hasLocalUnsyncedOps = hasLocalUnsyncedOpsNow;
events.emit(EVENT_HASLOCALUNSYNCEDOPERATIONSCHANGED, hasLocalUnsyncedOps);
}
/**
* @return {!boolean}
*/
this.hasLocalUnsyncedOps = function() {
return hasLocalUnsyncedOps;
};
/**
* @param {!boolean} hasConnection
* @return {undefined}
*/
function updateHasSessionHostConnectionState(hasConnection) {
// no change?
if (hasSessionHostConnection === hasConnection) {
return;
}
hasSessionHostConnection = hasConnection;
events.emit(EVENT_HASSESSIONHOSTCONNECTIONCHANGED, hasSessionHostConnection);
}
/**
* @return {!boolean}
*/
this.hasSessionHostConnection = function () {
return hasSessionHostConnection;
};
/**
* Sets the factory to use to create operation instances from operation specs.
*
* @param {!ops.OperationFactory} f
* @return {undefined}
*/
this.setOperationFactory = function (f) {
operationFactory = f;
};
/**
* Sets the method which should be called to apply operations.
*
* @param {!function(!ops.Operation):boolean} playback_func
* @return {undefined}
*/
this.setPlaybackFunction = function (playback_func) {
playbackFunction = playback_func;
};
/**
* Brings the locally created operations into the game.
*
* @param {!Array.<!{optype:!string}>} operationSpecs
* @return {undefined}
*/
function executeOpSpecs(operationSpecs) {
// This is an extremely simplistic and VERY temporary implementation of operation grouping.
// In order to improve undo behaviour, the undo manager requires knowledge about what groups
// of operations were queued together, so these can be stored in a single undo state.
// The current implementation is only designed for a localeditor instance & the TrivialUndoManager.
// TODO redesign this concept to work with collaborative editing
groupIdentifier += 1;
events.emit(ops.OperationRouter.signalProcessingBatchStart, {});
operationSpecs.forEach(function (opspec) {
var /**@type{?ops.Operation}*/
timedOp;
timedOp = operationFactory.create(opspec);
timedOp.group = "g" + groupIdentifier;
// TODO: handle return flag in error case
playbackFunction(timedOp);
});
events.emit(ops.OperationRouter.signalProcessingBatchEnd, {});
}
/**
* Executes local operations and prepares to send to these to the collab server
*
* @param {!Array.<!ops.Operation>} operations
* @return {undefined}
*/
this.push = function (operations) {
if (hasUnresolvableConflict) {
throw new Error("unresolvableConflictingOps");
}
var specs = [],
timestamp = Date.now();
operations.forEach(function(op) {
var spec = op.spec();
spec.timestamp = timestamp;
specs.push(spec);
});
unsyncedClientOpspecQueue = unsyncedClientOpspecQueue.concat(specs);
executeOpSpecs(specs);
updateUnsyncedOps();
syncTask.trigger();
};
/**
* @param {function()} cb
*/
this.close = function (cb) {
function internalDestroy() {
destroyed = true;
syncTask.destroy(cb);
}
syncAttemptCompleteCallbacks.push(internalDestroy);
syncTask.triggerImmediate();
if (!syncInProgress) {
internalDestroy();
}
};
/**
* @param {!string} eventId
* @param {!Function} cb
* @return {undefined}
*/
this.subscribe = function (eventId, cb) {
events.subscribe(eventId, cb);
};
/**
* @param {!string} eventId
* @param {!Function} cb
* @return {undefined}
*/
this.unsubscribe = function (eventId, cb) {
events.unsubscribe(eventId, cb);
};
/**
* Complete the current synchronize request
* @param {!boolean=} connectionState If specified, update the session host connection state.
* @return {undefined}
*/
function completeSyncRequest(connectionState) {
syncInProgress = false;
updateUnsyncedOps();
if (connectionState !== undefined) {
updateHasSessionHostConnectionState(connectionState);
}
syncAttemptCompleteCallbacks.forEach(function(cb) {
cb();
});
syncAttemptCompleteCallbacks.length = 0;
if (!destroyed) {
syncTask.cancel();
syncTask.trigger();
}
}
/**
* Log a synchronization error and retry
* @param {!Error} e
* @return {undefined}
*/
function logErrorAndRetry(e) {
var isConnected;
switch (e.code) {
case "SEQ_OUTOFDATE":
// This error is an expected as part of normal operation. Don't bother logging
isConnected = true;
break;
case "SERVICE_UNAVAILABLE":
// This error is an expected as part of normal operation. Don't bother logging.
// However, the error does indicate a connectivity problem with the session, so
// ensure the connection status is updated
isConnected = false;
break;
default:
isConnected = false;
runtime.log(e);
break;
}
completeSyncRequest(isConnected);
}
/**
* Remove synchronized client ops off the client queue
* @return {undefined}
*/
function onPushSuccess() {
// Only remove operations that have now been uploaded to the remote service
unsyncedClientOpspecQueue.splice(0, uploadingOpSpecCount);
completeSyncRequest(true);
successfulSyncCallbacks.forEach(function(cb) {
cb();
});
successfulSyncCallbacks.length = 0;
}
/**
* Process remote changes
* @param {!{specs: !Array.<!Object>, sequenceId: !Object}} remoteChanges
* @return {undefined}
*/
function onRemoteChangesSuccess(remoteChanges) {
var transformResult = operationTransformer.transform(unsyncedClientOpspecQueue, remoteChanges.specs);
if (!transformResult) {
hasUnresolvableConflict = true;
errorCallback("unresolvableConflictingOps");
completeSyncRequest();
return;
}
if (transformResult.opSpecsB.length > 0) {
// Apply remote changes that have been transformed against local updates
executeOpSpecs(transformResult.opSpecsB);
}
// The local client changes are now transformed against the current remote updates, and should not be transformed again.
// Update the local sync queue with the translated results in case the push attempt fails due to a conflict
unsyncedClientOpspecQueue = transformResult.opSpecsA;
// Send local changes that have been transformed against the remote updates
uploadingOpSpecCount = unsyncedClientOpspecQueue.length;
if (unsyncedClientOpspecQueue.length > 0) {
server.getJsGlobalServer().push(sessionId, memberId, remoteChanges.sequenceId, unsyncedClientOpspecQueue, onPushSuccess, logErrorAndRetry);
} else {
onPushSuccess();
}
}
/**
* DO NOT CALL DIRECTLY, use syncTask.trigger instead.
*
* Synchronize the local state with the remote server. This is an async call and will return immediately, while
* the processing occurs in the background.
*
* @return {undefined}
*/
function synchronizeWithServer() {
if (hasUnresolvableConflict === false && syncInProgress === false) {
syncInProgress = true;
server.getJsGlobalServer().getRemoteChanges(sessionId, memberId, onRemoteChangesSuccess, logErrorAndRetry);
}
}
this.requestReplay = function(cb) {
if (hasUnresolvableConflict) {
throw new Error("unresolvableConflictingOps");
}
successfulSyncCallbacks.push(cb);
syncTask.triggerImmediate();
};
function init() {
syncTask = core.Task.createTimeoutTask(synchronizeWithServer, 100);
}
init();
};
});